您的位置:首页 > 编程语言

RTMPdump(libRTMP) 源代码分析 9: 接收消息(Message)(接收视音频数据)

2016-07-18 16:11 399 查看

函数调用结构图

RTMPDump (libRTMP)的整体的函数调用结构图如下图所示。





单击查看大图

详细分析

前一篇文章分析了RTMPdump(libRTMP) 的发送消息(Message)方面的源代码:RTMPdump(libRTMP) 源代码分析 8: 发送消息(Message)

在这里在研究研究接收消息(Message)的源代码,接收消息最典型的应用就是接收视音频数据了,因为视频和音频分别都属于RTMP协议规范中的一种消息。在这里主要分析接收视音频数据。

RTMPdump中完成视音频数据的接收(也可以说是视音频数据的下载)的函数是:RTMP_Read()。

RTMPdump主程序中的Download()函数就是通过调用RTMP_Read()完成数据接收,从而实现下载的。

那么我们马上开始吧,首先看看RTMP_Read()函数:

[cpp]
view plain
copy

//FLV文件头  
static const char flvHeader[] = { 'F', 'L', 'V', 0x01,  
  0x00,             /* 0x04代表有音频, 0x01代表有视频 */  
  0x00, 0x00, 0x00, 0x09,  
  0x00, 0x00, 0x00, 0x00  
};  
  
#define HEADERBUF   (128*1024)  
int  
RTMP_Read(RTMP *r, char *buf, int size)  
{  
  int nRead = 0, total = 0;  
  
  /* can't continue */  
fail:  
  switch (r->m_read.status) {  
  case RTMP_READ_EOF:  
  case RTMP_READ_COMPLETE:  
    return 0;  
  case RTMP_READ_ERROR:  /* corrupted stream, resume failed */  
    SetSockError(EINVAL);  
    return -1;  
  default:  
    break;  
  }  
  
  /* first time thru */  
  if (!(r->m_read.flags & RTMP_READ_HEADER))  
    {  
      if (!(r->m_read.flags & RTMP_READ_RESUME))  
    {  
    //分配内存,指向buf的首部和尾部  
      char *mybuf = (char *) malloc(HEADERBUF), *end = mybuf + HEADERBUF;  
      int cnt = 0;  
      //buf指向同一地址  
      r->m_read.buf = mybuf;  
      r->m_read.buflen = HEADERBUF;  
  
      //把Flv的首部复制到mybuf指向的内存  
      //RTMP传递的多媒体数据是“砍头”的FLV文件  
      memcpy(mybuf, flvHeader, sizeof(flvHeader));  
      //m_read.buf指针后移flvheader个单位  
      r->m_read.buf += sizeof(flvHeader);  
      //buf长度增加flvheader长度  
      r->m_read.buflen -= sizeof(flvHeader);  
      //timestamp=0,不是多媒体数据  
      while (r->m_read.timestamp == 0)  
        {  
        //读取一个Packet,到r->m_read.buf  
        //nRead为读取结果标记  
          nRead = Read_1_Packet(r, r->m_read.buf, r->m_read.buflen);  
          //有错误  
          if (nRead < 0)  
        {  
          free(mybuf);  
          r->m_read.buf = NULL;  
          r->m_read.buflen = 0;  
          r->m_read.status = nRead;  
          goto fail;  
        }  
          /* buffer overflow, fix buffer and give up */  
          if (r->m_read.buf < mybuf || r->m_read.buf > end) {  
            mybuf = (char *) realloc(mybuf, cnt + nRead);  
        memcpy(mybuf+cnt, r->m_read.buf, nRead);  
        r->m_read.buf = mybuf+cnt+nRead;  
            break;  
          }  
          //  
          //记录读取的字节数  
          cnt += nRead;  
          //m_read.buf指针后移nRead个单位  
          r->m_read.buf += nRead;  
          r->m_read.buflen -= nRead;  
          //当dataType=00000101时,即有视频和音频时  
          //说明有多媒体数据了  
          if (r->m_read.dataType == 5)  
            break;  
        }  
      //读入数据类型  
      //注意:mybuf指针位置一直没动  
      //mybuf[4]中第 6 位表示是否存在音频Tag。第 8 位表示是否存在视频Tag。   
      mybuf[4] = r->m_read.dataType;  
      //两个指针之间的差  
      r->m_read.buflen = r->m_read.buf - mybuf;  
      r->m_read.buf = mybuf;  
      //这句很重要!后面memcopy  
      r->m_read.bufpos = mybuf;  
    }  
      //flags标明已经读完了文件头  
      r->m_read.flags |= RTMP_READ_HEADER;  
    }  
  
  if ((r->m_read.flags & RTMP_READ_SEEKING) && r->m_read.buf)  
    {  
      /* drop whatever's here */  
      free(r->m_read.buf);  
      r->m_read.buf = NULL;  
      r->m_read.bufpos = NULL;  
      r->m_read.buflen = 0;  
    }  
  
  /* If there's leftover data buffered, use it up */  
  if (r->m_read.buf)  
    {  
      nRead = r->m_read.buflen;  
      if (nRead > size)  
    nRead = size;  
      //m_read.bufpos指向mybuf  
      memcpy(buf, r->m_read.bufpos, nRead);  
      r->m_read.buflen -= nRead;  
      if (!r->m_read.buflen)  
    {  
      free(r->m_read.buf);  
      r->m_read.buf = NULL;  
      r->m_read.bufpos = NULL;  
    }  
      else  
    {  
      r->m_read.bufpos += nRead;  
    }  
      buf += nRead;  
      total += nRead;  
      size -= nRead;  
    }  
  //接着读  
  while (size > 0 && (nRead = Read_1_Packet(r, buf, size)) >= 0)  
    {  
      if (!nRead) continue;  
      buf += nRead;  
      total += nRead;  
      size -= nRead;  
      break;  
    }  
  if (nRead < 0)  
    r->m_read.status = nRead;  
  
  if (size < 0)  
    total += size;  
  return total;  
}  

程序关键的地方都已经注释上了代码,在此就不重复说明了。有一点要提一下:RTMP传送的视音频数据的格式和FLV(FLash Video)格式是一样的,把接收下来的数据直接存入文件就可以了。但是这些视音频数据没有文件头,是纯视音频数据,因此需要在其前面加上FLV格式的文件头,这样得到的数据存成文件后才能被一般的视频播放器所播放。FLV格式的文件头是13个字节,如代码中所示。

RTMP_Read()中实际读取数据的函数是Read_1_Packet(),它的功能是从网络上读取一个RTMPPacket的数据,来看看它的源代码吧:

[cpp]
view plain
copy

/* 从流媒体中读取多媒体packet。 
 * Returns -3 if Play.Close/Stop, -2 if fatal error, -1 if no more media 
 * packets, 0 if ignorable error, >0 if there is a media packet 
 */  
static int  
Read_1_Packet(RTMP *r, char *buf, unsigned int buflen)  
{  
  uint32_t prevTagSize = 0;  
  int rtnGetNextMediaPacket = 0, ret = RTMP_READ_EOF;  
  RTMPPacket packet = { 0 };  
  int recopy = FALSE;  
  unsigned int size;  
  char *ptr, *pend;  
  uint32_t nTimeStamp = 0;  
  unsigned int len;  
  //获取下一个packet  
  rtnGetNextMediaPacket = RTMP_GetNextMediaPacket(r, &packet);  
  while (rtnGetNextMediaPacket)  
    {  
      char *packetBody = packet.m_body;  
      unsigned int nPacketLen = packet.m_nBodySize;  
  
      /* Return -3 if this was completed nicely with invoke message 
       * Play.Stop or Play.Complete 
       */  
      if (rtnGetNextMediaPacket == 2)  
    {  
      RTMP_Log(RTMP_LOGDEBUG,  
          "Got Play.Complete or Play.Stop from server. "  
          "Assuming stream is complete");  
      ret = RTMP_READ_COMPLETE;  
      break;  
    }  
      //设置dataType  
      r->m_read.dataType |= (((packet.m_packetType == 0x08) << 2) |  
                 (packet.m_packetType == 0x09));  
      //MessageID为9时,为视频数据,数据太小时。。。  
      if (packet.m_packetType == 0x09 && nPacketLen <= 5)  
    {  
      RTMP_Log(RTMP_LOGDEBUG, "ignoring too small video packet: size: %d",  
          nPacketLen);  
      ret = RTMP_READ_IGNORE;  
      break;  
    }  
      //MessageID为8时,为音频数据,数据太小时。。。  
      if (packet.m_packetType == 0x08 && nPacketLen <= 1)  
    {  
      RTMP_Log(RTMP_LOGDEBUG, "ignoring too small audio packet: size: %d",  
          nPacketLen);  
      ret = RTMP_READ_IGNORE;  
      break;  
    }  
  
      if (r->m_read.flags & RTMP_READ_SEEKING)  
    {  
      ret = RTMP_READ_IGNORE;  
      break;  
    }  
#ifdef _DEBUG  
      RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, TS: %d ms, abs TS: %d",  
      packet.m_packetType, nPacketLen, packet.m_nTimeStamp,  
      packet.m_hasAbsTimestamp);  
      if (packet.m_packetType == 0x09)  
    RTMP_Log(RTMP_LOGDEBUG, "frametype: %02X", (*packetBody & 0xf0));  
#endif  
  
      if (r->m_read.flags & RTMP_READ_RESUME)  
    {  
      /* check the header if we get one */  
        //此类packet的timestamp都是0  
      if (packet.m_nTimeStamp == 0)  
        {  
        //messageID=18,数据消息(AMF0)  
          if (r->m_read.nMetaHeaderSize > 0  
          && packet.m_packetType == 0x12)  
        {  
        //获取metadata  
          AMFObject metaObj;  
          int nRes =  
            AMF_Decode(&metaObj, packetBody, nPacketLen, FALSE);  
          if (nRes >= 0)  
            {  
              AVal metastring;  
              AMFProp_GetString(AMF_GetProp(&metaObj, NULL, 0),  
                    &metastring);  
  
              if (AVMATCH(&metastring, &av_onMetaData))  
            {  
              /* compare */  
              if ((r->m_read.nMetaHeaderSize != nPacketLen) ||  
                  (memcmp  
                   (r->m_read.metaHeader, packetBody,  
                r->m_read.nMetaHeaderSize) != 0))  
                {  
                  ret = RTMP_READ_ERROR;  
                }  
            }  
              AMF_Reset(&metaObj);  
              if (ret == RTMP_READ_ERROR)  
            break;  
            }  
        }  
  
          /* check first keyframe to make sure we got the right position 
           * in the stream! (the first non ignored frame) 
           */  
          if (r->m_read.nInitialFrameSize > 0)  
        {  
          /* video or audio data */  
          if (packet.m_packetType == r->m_read.initialFrameType  
              && r->m_read.nInitialFrameSize == nPacketLen)  
            {  
              /* we don't compare the sizes since the packet can 
               * contain several FLV packets, just make sure the 
               * first frame is our keyframe (which we are going 
               * to rewrite) 
               */  
              if (memcmp  
              (r->m_read.initialFrame, packetBody,  
               r->m_read.nInitialFrameSize) == 0)  
            {  
              RTMP_Log(RTMP_LOGDEBUG, "Checked keyframe successfully!");  
              r->m_read.flags |= RTMP_READ_GOTKF;  
              /* ignore it! (what about audio data after it? it is 
               * handled by ignoring all 0ms frames, see below) 
               */  
              ret = RTMP_READ_IGNORE;  
              break;  
            }  
            }  
  
          /* hande FLV streams, even though the server resends the 
           * keyframe as an extra video packet it is also included 
           * in the first FLV stream chunk and we have to compare 
           * it and filter it out !! 
           */  
          //MessageID=22,聚合消息  
          if (packet.m_packetType == 0x16)  
            {  
              /* basically we have to find the keyframe with the 
               * correct TS being nResumeTS 
               */  
              unsigned int pos = 0;  
              uint32_t ts = 0;  
  
              while (pos + 11 < nPacketLen)  
            {  
              /* size without header (11) and prevTagSize (4) */  
              uint32_t dataSize =  
                AMF_DecodeInt24(packetBody + pos + 1);  
              ts = AMF_DecodeInt24(packetBody + pos + 4);  
              ts |= (packetBody[pos + 7] << 24);  
  
#ifdef _DEBUG  
              RTMP_Log(RTMP_LOGDEBUG,  
                  "keyframe search: FLV Packet: type %02X, dataSize: %d, timeStamp: %d ms",  
                  packetBody[pos], dataSize, ts);  
#endif  
              /* ok, is it a keyframe?: 
               * well doesn't work for audio! 
               */  
              if (packetBody[pos /*6928, test 0 */ ] ==  
                  r->m_read.initialFrameType  
                  /* && (packetBody[11]&0xf0) == 0x10 */ )  
                {  
                  if (ts == r->m_read.nResumeTS)  
                {  
                  RTMP_Log(RTMP_LOGDEBUG,  
                      "Found keyframe with resume-keyframe timestamp!");  
                  if (r->m_read.nInitialFrameSize != dataSize  
                      || memcmp(r->m_read.initialFrame,  
                        packetBody + pos + 11,  
                        r->m_read.  
                        nInitialFrameSize) != 0)  
                    {  
                      RTMP_Log(RTMP_LOGERROR,  
                      "FLV Stream: Keyframe doesn't match!");  
                      ret = RTMP_READ_ERROR;  
                      break;  
                    }  
                  r->m_read.flags |= RTMP_READ_GOTFLVK;  
  
                  /* skip this packet? 
                   * check whether skippable: 
                   */  
                  if (pos + 11 + dataSize + 4 > nPacketLen)  
                    {  
                      RTMP_Log(RTMP_LOGWARNING,  
                      "Non skipable packet since it doesn't end with chunk, stream corrupt!");  
                      ret = RTMP_READ_ERROR;  
                      break;  
                    }  
                  packetBody += (pos + 11 + dataSize + 4);  
                  nPacketLen -= (pos + 11 + dataSize + 4);  
  
                  goto stopKeyframeSearch;  
  
                }  
                  else if (r->m_read.nResumeTS < ts)  
                {  
                  /* the timestamp ts will only increase with 
                   * further packets, wait for seek 
                   */  
                  goto stopKeyframeSearch;  
                }  
                }  
              pos += (11 + dataSize + 4);  
            }  
              if (ts < r->m_read.nResumeTS)  
            {  
              RTMP_Log(RTMP_LOGERROR,  
                  "First packet does not contain keyframe, all "  
                  "timestamps are smaller than the keyframe "  
                  "timestamp; probably the resume seek failed?");  
            }  
            stopKeyframeSearch:  
              ;  
              if (!(r->m_read.flags & RTMP_READ_GOTFLVK))  
            {  
              RTMP_Log(RTMP_LOGERROR,  
                  "Couldn't find the seeked keyframe in this chunk!");  
              ret = RTMP_READ_IGNORE;  
              break;  
            }  
            }  
        }  
        }  
  
      if (packet.m_nTimeStamp > 0  
          && (r->m_read.flags & (RTMP_READ_GOTKF|RTMP_READ_GOTFLVK)))  
        {  
          /* another problem is that the server can actually change from 
           * 09/08 video/audio packets to an FLV stream or vice versa and 
           * our keyframe check will prevent us from going along with the 
           * new stream if we resumed. 
           * 
           * in this case set the 'found keyframe' variables to true. 
           * We assume that if we found one keyframe somewhere and were 
           * already beyond TS > 0 we have written data to the output 
           * which means we can accept all forthcoming data including the 
           * change between 08/09 <-> FLV packets 
           */  
          r->m_read.flags |= (RTMP_READ_GOTKF|RTMP_READ_GOTFLVK);  
        }  
  
      /* skip till we find our keyframe 
       * (seeking might put us somewhere before it) 
       */  
      if (!(r->m_read.flags & RTMP_READ_GOTKF) &&  
        packet.m_packetType != 0x16)  
        {  
          RTMP_Log(RTMP_LOGWARNING,  
          "Stream does not start with requested frame, ignoring data... ");  
          r->m_read.nIgnoredFrameCounter++;  
          if (r->m_read.nIgnoredFrameCounter > MAX_IGNORED_FRAMES)  
        ret = RTMP_READ_ERROR;  /* fatal error, couldn't continue stream */  
          else  
        ret = RTMP_READ_IGNORE;  
          break;  
        }  
      /* ok, do the same for FLV streams */  
      if (!(r->m_read.flags & RTMP_READ_GOTFLVK) &&  
        packet.m_packetType == 0x16)  
        {  
          RTMP_Log(RTMP_LOGWARNING,  
          "Stream does not start with requested FLV frame, ignoring data... ");  
          r->m_read.nIgnoredFlvFrameCounter++;  
          if (r->m_read.nIgnoredFlvFrameCounter > MAX_IGNORED_FRAMES)  
        ret = RTMP_READ_ERROR;  
          else  
        ret = RTMP_READ_IGNORE;  
          break;  
        }  
  
      /* we have to ignore the 0ms frames since these are the first 
       * keyframes; we've got these so don't mess around with multiple 
       * copies sent by the server to us! (if the keyframe is found at a 
       * later position there is only one copy and it will be ignored by 
       * the preceding if clause) 
       */  
      if (!(r->m_read.flags & RTMP_READ_NO_IGNORE) &&  
        packet.m_packetType != 0x16)  
        {           /* exclude type 0x16 (FLV) since it can 
                 * contain several FLV packets */  
          if (packet.m_nTimeStamp == 0)  
        {  
          ret = RTMP_READ_IGNORE;  
          break;  
        }  
          else  
        {  
          /* stop ignoring packets */  
          r->m_read.flags |= RTMP_READ_NO_IGNORE;  
        }  
        }  
    }  
  
      /* calculate packet size and allocate slop buffer if necessary */  
      size = nPacketLen +  
    ((packet.m_packetType == 0x08 || packet.m_packetType == 0x09  
      || packet.m_packetType == 0x12) ? 11 : 0) +  
    (packet.m_packetType != 0x16 ? 4 : 0);  
  
      if (size + 4 > buflen)  
    {  
      /* the extra 4 is for the case of an FLV stream without a last 
       * prevTagSize (we need extra 4 bytes to append it) */  
      r->m_read.buf = (char *) malloc(size + 4);  
      if (r->m_read.buf == 0)  
        {  
          RTMP_Log(RTMP_LOGERROR, "Couldn't allocate memory!");  
          ret = RTMP_READ_ERROR;        /* fatal error */  
          break;  
        }  
      recopy = TRUE;  
      ptr = r->m_read.buf;  
    }  
      else  
    {  
      ptr = buf;  
    }  
      pend = ptr + size + 4;  
  
      /* use to return timestamp of last processed packet */  
  
      /* audio (0x08), video (0x09) or metadata (0x12) packets : 
       * construct 11 byte header then add rtmp packet's data */  
      if (packet.m_packetType == 0x08 || packet.m_packetType == 0x09  
      || packet.m_packetType == 0x12)  
    {  
      nTimeStamp = r->m_read.nResumeTS + packet.m_nTimeStamp;  
      prevTagSize = 11 + nPacketLen;  
  
      *ptr = packet.m_packetType;  
      ptr++;  
      ptr = AMF_EncodeInt24(ptr, pend, nPacketLen);  
  
#if 0  
        if(packet.m_packetType == 0x09) { /* video */  
  
         /* H264 fix: */  
         if((packetBody[0] & 0x0f) == 7) { /* CodecId = H264 */  
         uint8_t packetType = *(packetBody+1);  
  
         uint32_t ts = AMF_DecodeInt24(packetBody+2); /* composition time */  
         int32_t cts = (ts+0xff800000)^0xff800000;  
         RTMP_Log(RTMP_LOGDEBUG, "cts  : %d\n", cts);  
  
         nTimeStamp -= cts;  
         /* get rid of the composition time */  
         CRTMP::EncodeInt24(packetBody+2, 0);  
         }  
         RTMP_Log(RTMP_LOGDEBUG, "VIDEO: nTimeStamp: 0x%08X (%d)\n", nTimeStamp, nTimeStamp);  
         }  
#endif  
  
      ptr = AMF_EncodeInt24(ptr, pend, nTimeStamp);  
      *ptr = (char)((nTimeStamp & 0xFF000000) >> 24);  
      ptr++;  
  
      /* stream id */  
      ptr = AMF_EncodeInt24(ptr, pend, 0);  
    }  
  
      memcpy(ptr, packetBody, nPacketLen);  
      len = nPacketLen;  
  
      /* correct tagSize and obtain timestamp if we have an FLV stream */  
      if (packet.m_packetType == 0x16)  
    {  
      unsigned int pos = 0;  
      int delta;  
  
      /* grab first timestamp and see if it needs fixing */  
//    nTimeStamp = AMF_DecodeInt24(packetBody + 4);  
    //  nTimeStamp |= (packetBody[7] << 24);  
//    delta = packet.m_nTimeStamp - nTimeStamp;  
  
      while (pos + 11 < nPacketLen)  
        {  
          /* size without header (11) and without prevTagSize (4) */  
          uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1);  
          nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4);  
          nTimeStamp |= (packetBody[pos + 7] << 24);  
  
//        if (delta)  
//      {  
//        nTimeStamp += delta;  
//        AMF_EncodeInt24(ptr+pos+4, pend, nTimeStamp);  
//        ptr[pos+7] = nTimeStamp>>24;  
//      }  
  
          /* set data type */  
          r->m_read.dataType |= (((*(packetBody + pos) == 0x08) << 2) |  
                     (*(packetBody + pos) == 0x09));  
  
          if (pos + 11 + dataSize + 4 > nPacketLen)  
        {  
          if (pos + 11 + dataSize > nPacketLen)  
            {  
              RTMP_Log(RTMP_LOGERROR,  
              "Wrong data size (%lu), stream corrupted, aborting!",  
              dataSize);  
              ret = RTMP_READ_ERROR;  
              break;  
            }  
          RTMP_Log(RTMP_LOGWARNING, "No tagSize found, appending!");  
  
          /* we have to append a last tagSize! */  
          prevTagSize = dataSize + 11;  
          AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend,  
                  prevTagSize);  
          size += 4;  
          len += 4;  
        }  
          else  
        {  
          prevTagSize =  
            AMF_DecodeInt32(packetBody + pos + 11 + dataSize);  
  
#ifdef _DEBUG  
          RTMP_Log(RTMP_LOGDEBUG,  
              "FLV Packet: type %02X, dataSize: %lu, tagSize: %lu, timeStamp: %lu ms",  
              (unsigned char)packetBody[pos], dataSize, prevTagSize,  
              nTimeStamp);  
#endif  
  
          if (prevTagSize != (dataSize + 11))  
            {  
#ifdef _DEBUG  
              RTMP_Log(RTMP_LOGWARNING,  
              "Tag and data size are not consitent, writing tag size according to dataSize+11: %d",  
              dataSize + 11);  
#endif  
  
              prevTagSize = dataSize + 11;  
              AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend,  
                      prevTagSize);  
            }  
        }  
  
          pos += prevTagSize + 4;   /*(11+dataSize+4); */  
        }  
    }  
      ptr += len;  
  
      if (packet.m_packetType != 0x16)  
    {  
      /* FLV tag packets contain their own prevTagSize */  
      AMF_EncodeInt32(ptr, pend, prevTagSize);  
    }  
  
      /* In non-live this nTimeStamp can contain an absolute TS. 
       * Update ext timestamp with this absolute offset in non-live mode 
       * otherwise report the relative one 
       */  
      /* RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, pktTS: %dms, TS: %dms, bLiveStream: %d", packet.m_packetType, nPacketLen, packet.m_nTimeStamp, nTimeStamp, r->Link.lFlags & RTMP_LF_LIVE); */  
      r->m_read.timestamp = (r->Link.lFlags & RTMP_LF_LIVE) ? packet.m_nTimeStamp : nTimeStamp;  
  
      ret = size;  
      break;  
    }  
  
  if (rtnGetNextMediaPacket)  
    RTMPPacket_Free(&packet);  
  
  if (recopy)  
    {  
      len = ret > buflen ? buflen : ret;  
      memcpy(buf, r->m_read.buf, len);  
      r->m_read.bufpos = r->m_read.buf + len;  
      r->m_read.buflen = ret - len;  
    }  
  return ret;  
}  

函数功能很多,重要的地方已经加上了注释,在此不再细分析。Read_1_Packet()里面实现从网络中读取视音频数据的函数是RTMP_GetNextMediaPacket()。下面我们来看看该函数的源代码:

[cpp]
view plain
copy

int  
RTMP_GetNextMediaPacket(RTMP *r, RTMPPacket *packet)  
{  
  int bHasMediaPacket = 0;  
  
  while (!bHasMediaPacket && RTMP_IsConnected(r)  
     && RTMP_ReadPacket(r, packet))  
    {  
      if (!RTMPPacket_IsReady(packet))  
    {  
      continue;  
    }  
  
      bHasMediaPacket = RTMP_ClientPacket(r, packet);  
  
      if (!bHasMediaPacket)  
    {  
      RTMPPacket_Free(packet);  
    }  
      else if (r->m_pausing == 3)  
    {  
      if (packet->m_nTimeStamp <= r->m_mediaStamp)  
        {  
          bHasMediaPacket = 0;  
#ifdef _DEBUG  
          RTMP_Log(RTMP_LOGDEBUG,  
          "Skipped type: %02X, size: %d, TS: %d ms, abs TS: %d, pause: %d ms",  
          packet->m_packetType, packet->m_nBodySize,  
          packet->m_nTimeStamp, packet->m_hasAbsTimestamp,  
          r->m_mediaStamp);  
#endif  
          continue;  
        }  
      r->m_pausing = 0;  
    }  
    }  
  
  if (bHasMediaPacket)  
    r->m_bPlaying = TRUE;  
  else if (r->m_sb.sb_timedout && !r->m_pausing)  
    r->m_pauseStamp = r->m_channelTimestamp[r->m_mediaChannel];  
  
  return bHasMediaPacket;  
}  

这里有两个函数比较重要:RTMP_ReadPacket()以及RTMP_ClientPacket()。这两个函数中,前一个函数负责从网络上读取数据,后一个负责处理数据。这部分与建立RTMP连接的网络流(NetStream)的时候很相似,参考:RTMPdump(libRTMP)
源代码分析 6: 建立一个流媒体连接 (NetStream部分 1)
RTMP_ClientPacket()在前文中已经做过分析,在此不再重复叙述。在这里重点分析一下RTMP_ReadPacket(),来看看它的源代码。

[cpp]
view plain
copy

//读取收下来的Chunk  
int  
RTMP_ReadPacket(RTMP *r, RTMPPacket *packet)  
{  
    //packet 存读取完后的的数据  
    //Chunk Header最大值18  
  uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };  
    //header 指向的是从Socket中收下来的数据  
  char *header = (char *)hbuf;  
  int nSize, hSize, nToRead, nChunk;  
  int didAlloc = FALSE;  
  
  RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket);  
  //收下来的数据存入hbuf  
  if (ReadN(r, (char *)hbuf, 1) == 0)  
    {  
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);  
      return FALSE;  
    }  
  //块类型fmt  
  packet->m_headerType = (hbuf[0] & 0xc0) >> 6;  
  //块流ID(2-63)  
  packet->m_nChannel = (hbuf[0] & 0x3f);  
  header++;  
  //块流ID第1字节为0时,块流ID占2个字节  
  if (packet->m_nChannel == 0)  
    {  
      if (ReadN(r, (char *)&hbuf[1], 1) != 1)  
    {  
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",  
          __FUNCTION__);  
      return FALSE;  
    }  
      //计算块流ID(64-319)  
      packet->m_nChannel = hbuf[1];  
      packet->m_nChannel += 64;  
      header++;  
    }  
  //块流ID第1字节为0时,块流ID占3个字节  
  else if (packet->m_nChannel == 1)  
    {  
      int tmp;  
      if (ReadN(r, (char *)&hbuf[1], 2) != 2)  
    {  
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",  
          __FUNCTION__);  
      return FALSE;  
    }  
      tmp = (hbuf[2] << 8) + hbuf[1];  
      //计算块流ID(64-65599)  
      packet->m_nChannel = tmp + 64;  
      RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);  
      header += 2;  
    }  
  //ChunkHeader的大小(4种)  
  nSize = packetSize[packet->m_headerType];  
  
  if (nSize == RTMP_LARGE_HEADER_SIZE)  /* if we get a full header the timestamp is absolute */  
    packet->m_hasAbsTimestamp = TRUE;    //11字节的完整ChunkMsgHeader的TimeStamp是绝对值  
  
  else if (nSize < RTMP_LARGE_HEADER_SIZE)  
    {               /* using values from the last message of this channel */  
      if (r->m_vecChannelsIn[packet->m_nChannel])  
    memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],  
           sizeof(RTMPPacket));  
    }  
  
  nSize--;  
  
  if (nSize > 0 && ReadN(r, header, nSize) != nSize)  
    {  
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",  
      __FUNCTION__, (unsigned int)hbuf[0]);  
      return FALSE;  
    }  
  
  hSize = nSize + (header - (char *)hbuf);  
  
  if (nSize >= 3)  
    {  
    //TimeStamp(注意 BigEndian to SmallEndian)(11,7,3字节首部都有)  
      packet->m_nTimeStamp = AMF_DecodeInt24(header);  
  
      /*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */  
    //消息长度(11,7字节首部都有)  
      if (nSize >= 6)  
    {  
      packet->m_nBodySize = AMF_DecodeInt24(header + 3);  
      packet->m_nBytesRead = 0;  
      RTMPPacket_Free(packet);  
    //(11,7字节首部都有)  
      if (nSize > 6)  
        {  
          //Msg type ID  
          packet->m_packetType = header[6];  
          //Msg Stream ID  
          if (nSize == 11)  
        packet->m_nInfoField2 = DecodeInt32LE(header + 7);  
        }  
    }  
      //Extend TimeStamp  
      if (packet->m_nTimeStamp == 0xffffff)  
    {  
      if (ReadN(r, header + nSize, 4) != 4)  
        {  
          RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",  
          __FUNCTION__);  
          return FALSE;  
        }  
      packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);  
      hSize += 4;  
    }  
    }  
  
  RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize);  
  
  if (packet->m_nBodySize > 0 && packet->m_body == NULL)  
    {  
      if (!RTMPPacket_Alloc(packet, packet->m_nBodySize))  
    {  
      RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);  
      return FALSE;  
    }  
      didAlloc = TRUE;  
      packet->m_headerType = (hbuf[0] & 0xc0) >> 6;  
    }  
  
  nToRead = packet->m_nBodySize - packet->m_nBytesRead;  
  nChunk = r->m_inChunkSize;  
  if (nToRead < nChunk)  
    nChunk = nToRead;  
  
  /* Does the caller want the raw chunk? */  
  if (packet->m_chunk)  
    {  
      packet->m_chunk->c_headerSize = hSize;  
      memcpy(packet->m_chunk->c_header, hbuf, hSize);  
      packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;  
      packet->m_chunk->c_chunkSize = nChunk;  
    }  
  
  if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk)  
    {  
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %lu",  
      __FUNCTION__, packet->m_nBodySize);  
      return FALSE;  
    }  
  
  RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);  
  
  packet->m_nBytesRead += nChunk;  
  
  /* keep the packet as ref for other packets on this channel */  
  if (!r->m_vecChannelsIn[packet->m_nChannel])  
    r->m_vecChannelsIn[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket));  
  memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));  
  //读取完毕  
  if (RTMPPacket_IsReady(packet))  
    {  
      /* make packet's timestamp absolute */  
      if (!packet->m_hasAbsTimestamp)  
    packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; /* timestamps seem to be always relative!! */  
  
      r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;  
  
      /* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel */  
      /* arrives and requests to re-use some info (small packet header) */  
      r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;  
      r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;  
      r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE;   /* can only be false if we reuse header */  
    }  
  else  
    {  
      packet->m_body = NULL; /* so it won't be erased on free */  
    }  
  
  return TRUE;  
}  

函数代码看似很多,但是并不是很复杂,可以理解为在从事“简单重复性劳动”(和搬砖差不多)。基本上是一个字节一个字节的读取,然后按照RTMP协议规范进行解析。具体如何解析可以参考RTMP协议规范。
在RTMP_ReadPacket()函数里完成从Socket中读取数据的函数是ReadN(),继续看看它的源代码:

[cpp]
view plain
copy

//从HTTP或SOCKET中读取数据  
static int  
ReadN(RTMP *r, char *buffer, int n)  
{  
  int nOriginalSize = n;  
  int avail;  
  char *ptr;  
  
  r->m_sb.sb_timedout = FALSE;  
  
#ifdef _DEBUG  
  memset(buffer, 0, n);  
#endif  
  
  ptr = buffer;  
  while (n > 0)  
    {  
      int nBytes = 0, nRead;  
      if (r->Link.protocol & RTMP_FEATURE_HTTP)  
        {  
      while (!r->m_resplen)  
        {  
          if (r->m_sb.sb_size < 144)  
            {  
          if (!r->m_unackd)  
            HTTP_Post(r, RTMPT_IDLE, "", 1);  
          if (RTMPSockBuf_Fill(&r->m_sb) < 1)  
            {  
              if (!r->m_sb.sb_timedout)  
                RTMP_Close(r);  
              return 0;  
            }  
        }  
          HTTP_read(r, 0);  
        }  
      if (r->m_resplen && !r->m_sb.sb_size)  
        RTMPSockBuf_Fill(&r->m_sb);  
          avail = r->m_sb.sb_size;  
      if (avail > r->m_resplen)  
        avail = r->m_resplen;  
    }  
      else  
        {  
          avail = r->m_sb.sb_size;  
      if (avail == 0)  
        {  
          if (RTMPSockBuf_Fill(&r->m_sb) < 1)  
            {  
              if (!r->m_sb.sb_timedout)  
                RTMP_Close(r);  
              return 0;  
        }  
          avail = r->m_sb.sb_size;  
        }  
    }  
      nRead = ((n < avail) ? n : avail);  
      if (nRead > 0)  
    {  
      memcpy(ptr, r->m_sb.sb_start, nRead);  
      r->m_sb.sb_start += nRead;  
      r->m_sb.sb_size -= nRead;  
      nBytes = nRead;  
      r->m_nBytesIn += nRead;  
      if (r->m_bSendCounter  
          && r->m_nBytesIn > r->m_nBytesInSent + r->m_nClientBW / 2)  
        SendBytesReceived(r);  
    }  
      /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d bytes\n", __FUNCTION__, nBytes); */  
#ifdef _DEBUG  
      fwrite(ptr, 1, nBytes, netstackdump_read);  
#endif  
  
      if (nBytes == 0)  
    {  
      RTMP_Log(RTMP_LOGDEBUG, "%s, RTMP socket closed by peer", __FUNCTION__);  
      /*goto again; */  
      RTMP_Close(r);  
      break;  
    }  
  
      if (r->Link.protocol & RTMP_FEATURE_HTTP)  
    r->m_resplen -= nBytes;  
  
#ifdef CRYPTO  
      if (r->Link.rc4keyIn)  
    {  
      RC4_encrypt((RC4_KEY *)r->Link.rc4keyIn, nBytes, ptr);  
    }  
#endif  
  
      n -= nBytes;  
      ptr += nBytes;  
    }  
  
  return nOriginalSize - n;  
}  

ReadN()中实现从Socket中接收数据的函数是RTMPSockBuf_Fill(),看看代码吧(又是层层调用)。

[cpp]
view plain
copy

//调用Socket编程中的recv()函数,接收数据  
int  
RTMPSockBuf_Fill(RTMPSockBuf *sb)  
{  
  int nBytes;  
  
  if (!sb->sb_size)  
    sb->sb_start = sb->sb_buf;  
  
  while (1)  
    {  
    //缓冲区长度:总长-未处理字节-已处理字节  
    //|-----已处理--------|-----未处理--------|---------缓冲区----------|  
    //sb_buf        sb_start    sb_size       
      nBytes = sizeof(sb->sb_buf) - sb->sb_size - (sb->sb_start - sb->sb_buf);  
#if defined(CRYPTO) && !defined(NO_SSL)  
      if (sb->sb_ssl)  
    {  
      nBytes = TLS_read((SSL *)sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes);  
    }  
      else  
#endif  
    {  
    //int recv( SOCKET s, char * buf, int len, int flags);  
    //s:一个标识已连接套接口的描述字。  
    //buf:用于接收数据的缓冲区。   
    //len:缓冲区长度。  
    //flags:指定调用方式。  
    //从sb_start(待处理的下一字节) + sb_size()还未处理的字节开始buffer为空,可以存储  
        nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0);  
    }  
      if (nBytes != -1)  
    {  
    //未处理的字节又多了  
      sb->sb_size += nBytes;  
    }  
      else  
    {  
      int sockerr = GetSockError();  
      RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)",  
          __FUNCTION__, nBytes, sockerr, strerror(sockerr));  
      if (sockerr == EINTR && !RTMP_ctrlC)  
        continue;  
  
      if (sockerr == EWOULDBLOCK || sockerr == EAGAIN)  
        {  
          sb->sb_timedout = TRUE;  
          nBytes = 0;  
        }  
    }  
      break;  
    }  
  
  return nBytes;  
}  

从RTMPSockBuf_Fill()代码中可以看出,调用了系统Socket的recv()函数接收RTMP连接传输过来的数据。

rtmpdump源代码(Linux):http://download.csdn.net/detail/leixiaohua1020/6376561

rtmpdump源代码(VC 2005 工程):http://download.csdn.net/detail/leixiaohua1020/6563163
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息