Live555学习笔记(三)—— RTSP命令处理及RTP,RTCP服务建立
2016-07-04 20:50
531 查看
在上一章中我们已经知道了RTSP服务运作,RTSP创建之后就会一直调用RTSPServer::RTSPClientConnection::handleRequestBytes函数查询客户端的命令(OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, GET_PARAMETER, SET_PARAMETER),如果接收到命令则进入相应的命令处理函数执行。其函数定义如下:
涉及到的命令比较多,这里只对setup和play命令进行处理:
(2.0)下面处理URL中不带track id的情况,当文件中有一个流的时,允许这种情况出现,这里流名称保存在urlSuffix变量中。
(3.0)若这是这个session所处理的第一个“SETUP”命令,需要构建一个streamState型的数组,并初始化。
(4.0)查找track id对应的subsession是否存在,不存在则进行错误处理
(5.0)例外情况:URL中不存在 track id,仅当只有一个subsession的情况下才充许出现
(6.0)处理Transport头部,获取传输相关信息
(7.0)处理Range头部
(8.0)从subsession中获取参数,fOurSessionId, 标识了一个客户端的session,是在RTSPServer::incomingConnectionHandler函数中生成的随机数,RTP和RTCP在该函数里面创建
(9.0)生成RTSP的应答信息
接下来再看看play命令的处理:
(11.0)测试scale的值是否能满足,这期间可能会改变scale的值
(12.0)聚合的情况下,subsession还不确定
(13.0)测试scale的值(注意该函数的调用)
(14.0)分析"Range:"头部 ,"Range:"头部,表示要播放的时间范围。如Range: npt=0.000-,从0时刻开始播放看到结束 不含Range 首部域的PLAY 请求也是合法的。它从媒体流开头开始播放,直到媒体流被暂停
(15.0)创建响应中的"RTP-Info:"行
(16.0)根据要求,在每个subsession上进行seeking/scaling操作
(17.0)设置subsession的scale值
(18.0)计算流的播放时间streamDuration
(19.0)这里情况下进行快退操作
(20.0)设置每个subsession上的播放时间范围
(21.0)开始各个subsession上的数据传输, 即开始播放了
(22.0)subsession中的信息添加到"RTP-Info:"行中
(23.0)组装响应包的操作
上面我们介绍了setup和play命令的处理。在上一章中我们知道了RTSP服务的运作,但是还没有涉及到RTP和RTCP,RTP和RTCP是在Setup命令处理的过程中创建的,数据的传输是在paly命令处理过程中开始的。Live555的数据流向我们在下一章中分析,接下来我们要分析的是RTP和RTCP的建立,它是在下面的这个函数中实现的。
(31.0)增加引用记数
(32.0)正常情况下,创建一个新的media source
(33.0)创建source,在处理DESCRIBE命令时也创建过,那是在OnDemandServerMediaSubsession::sdpLines()函数中, 但参数clientSessionId为0。
(34.0)创建一对groupsocks实例,分别用于传输RTP、RTCP。RTP、RTCP的端口号是相邻的,并且RTP端口号为偶数。初始端口fInitialPortNum = 6970,这是OnDemandServerMediaSubsession构造函数的缺省参数
(35.0)创建RTPSink
(36.0)重新配置发送RTP 的socket缓冲区大小
(37.0)建立流的状态对像(stream token),其它包括sink、source、groupsock等的对应关系注意,live555中定义了两个StreamState结构,这里的StreamState定义为一个类。在RTSPServer中,定义了一个内部结构体StreamState,其streamToken成员指向此处的StreamState实例
(38.0)这里定义了类Destinations来保存目的地址、RTP端口、RTCP端口,并将其与对应的clientSessionId保存到哈希表 fDestinationsHashTable中,这个哈希表是定义在OnDemandServerMediaSubsession类中
到这里已经创建好了RTSP,RTP和RTCP,接下来就是数据的传输的。
void RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead) { int numBytesRemaining = 0; ++fRecursionCount; do { RTSPServer::RTSPClientSession* clientSession = NULL; if (newBytesRead < 0 || (unsigned)newBytesRead >= fRequestBufferBytesLeft) { // Either the client socket has died, or the request was too big for us. // Terminate this connection: #ifdef DEBUG fprintf(stderr, "RTSPClientConnection[%p]::handleRequestBytes() read %d new bytes (of %d); terminating connection!\n", this, newBytesRead, fRequestBufferBytesLeft); #endif fIsActive = False; break; } Boolean endOfMsg = False; unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen]; #ifdef DEBUG ptr[newBytesRead] = '\0'; fprintf(stderr, "RTSPClientConnection[%p]::handleRequestBytes() %s %d new bytes:%s\n", this, numBytesRemaining > 0 ? "processing" : "read", newBytesRead, ptr); #endif if (fClientOutputSocket != fClientInputSocket && numBytesRemaining == 0) { // We're doing RTSP-over-HTTP tunneling, and input commands are assumed to have been Base64-encoded. // We therefore Base64-decode as much of this new data as we can (i.e., up to a multiple of 4 bytes). // But first, we remove any whitespace that may be in the input data: unsigned toIndex = 0; for (int fromIndex = 0; fromIndex < newBytesRead; ++fromIndex) { char c = ptr[fromIndex]; if (!(c == ' ' || c == '\t' || c == '\r' || c == '\n')) { // not 'whitespace': space,tab,CR,NL ptr[toIndex++] = c; } } newBytesRead = toIndex; unsigned numBytesToDecode = fBase64RemainderCount + newBytesRead; unsigned newBase64RemainderCount = numBytesToDecode%4; numBytesToDecode -= newBase64RemainderCount; if (numBytesToDecode > 0) { ptr[newBytesRead] = '\0'; unsigned decodedSize; unsigned char* decodedBytes = base64Decode((char const*)(ptr-fBase64RemainderCount), numBytesToDecode, decodedSize); #ifdef DEBUG fprintf(stderr, "Base64-decoded %d input bytes into %d new bytes:", numBytesToDecode, decodedSize); for (unsigned k = 0; k < decodedSize; ++k) fprintf(stderr, "%c", decodedBytes[k]); fprintf(stderr, "\n"); #endif // Copy the new decoded bytes in place of the old ones (we can do this because there are fewer decoded bytes than original): unsigned char* to = ptr-fBase64RemainderCount; for (unsigned i = 0; i < decodedSize; ++i) *to++ = decodedBytes[i]; // Then copy any remaining (undecoded) bytes to the end: for (unsigned j = 0; j < newBase64RemainderCount; ++j) *to++ = (ptr-fBase64RemainderCount+numBytesToDecode)[j]; newBytesRead = decodedSize - fBase64RemainderCount + newBase64RemainderCount; // adjust to allow for the size of the new decoded data (+ remainder) delete[] decodedBytes; } fBase64RemainderCount = newBase64RemainderCount; } unsigned char* tmpPtr = fLastCRLF + 2; if (fBase64RemainderCount == 0) { // no more Base-64 bytes remain to be read/decoded // Look for the end of the message: <CR><LF><CR><LF> if (tmpPtr < fRequestBuffer) tmpPtr = fRequestBuffer; while (tmpPtr < &ptr[newBytesRead-1]) { if (*tmpPtr == '\r' && *(tmpPtr+1) == '\n') { if (tmpPtr - fLastCRLF == 2) { // This is it: endOfMsg = True; break; } fLastCRLF = tmpPtr; } ++tmpPtr; } } fRequestBufferBytesLeft -= newBytesRead; fRequestBytesAlreadySeen += newBytesRead; if (!endOfMsg) break; // subsequent reads will be needed to complete the request // Parse the request string into command name and 'CSeq', then handle the command: fRequestBuffer[fRequestBytesAlreadySeen] = '\0'; char cmdName[RTSP_PARAM_STRING_MAX]; char urlPreSuffix[RTSP_PARAM_STRING_MAX]; char urlSuffix[RTSP_PARAM_STRING_MAX]; char cseq[RTSP_PARAM_STRING_MAX]; char sessionIdStr[RTSP_PARAM_STRING_MAX]; unsigned contentLength = 0; fLastCRLF[2] = '\0'; // temporarily, for parsing Boolean parseSucceeded = parseRTSPRequestString((char*)fRequestBuffer, fLastCRLF+2 - fRequestBuffer, cmdName, sizeof cmdName, urlPreSuffix, sizeof urlPreSuffix, urlSuffix, sizeof urlSuffix, cseq, sizeof cseq, sessionIdStr, sizeof sessionIdStr, contentLength); fLastCRLF[2] = '\r'; // restore its value Boolean playAfterSetup = False; if (parseSucceeded) { #ifdef DEBUG fprintf(stderr, "parseRTSPRequestString() succeeded, returning cmdName \"%s\", urlPreSuffix \"%s\", urlSuffix \"%s\", CSeq \"%s\", Content-Length %u, with %d bytes following the message.\n", cmdName, urlPreSuffix, urlSuffix, cseq, contentLength, ptr + newBytesRead - (tmpPtr + 2)); #endif // If there was a "Content-Length:" header, then make sure we've received all of the data that it specified: if (ptr + newBytesRead < tmpPtr + 2 + contentLength) break; // we still need more data; subsequent reads will give it to us // If the request included a "Session:" id, and it refers to a client session that's // current ongoing, then use this command to indicate 'liveness' on that client session: Boolean const requestIncludedSessionId = sessionIdStr[0] != '\0'; if (requestIncludedSessionId) { clientSession = (RTSPServer::RTSPClientSession*)(fOurRTSPServer.lookupClientSession(sessionIdStr)); if (clientSession != NULL) clientSession->noteLiveness(); } // We now have a complete RTSP request. // Handle the specified command (beginning with commands that are session-independent): fCurrentCSeq = cseq; if (strcmp(cmdName, "OPTIONS") == 0) { // If the "OPTIONS" command included a "Session:" id for a session that doesn't exist, // then treat this as an error: if (requestIncludedSessionId && clientSession == NULL) { handleCmd_sessionNotFound(); } else { // Normal case: handleCmd_OPTIONS(); } } else if (urlPreSuffix[0] == '\0' && urlSuffix[0] == '*' && urlSuffix[1] == '\0') { // The special "*" URL means: an operation on the entire server. This works only for GET_PARAMETER and SET_PARAMETER: if (strcmp(cmdName, "GET_PARAMETER") == 0) { handleCmd_GET_PARAMETER((char const*)fRequestBuffer); } else if (strcmp(cmdName, "SET_PARAMETER") == 0) { handleCmd_SET_PARAMETER((char const*)fRequestBuffer); } else { handleCmd_notSupported(); } } else if (strcmp(cmdName, "DESCRIBE") == 0) { handleCmd_DESCRIBE(urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); } else if (strcmp(cmdName, "SETUP") == 0) { Boolean areAuthenticated = True; if (!requestIncludedSessionId) { // No session id was present in the request. // So create a new "RTSPClientSession" object for this request. // But first, make sure that we're authenticated to perform this command: char urlTotalSuffix[2*RTSP_PARAM_STRING_MAX]; // enough space for urlPreSuffix/urlSuffix'\0' urlTotalSuffix[0] = '\0'; if (urlPreSuffix[0] != '\0') { strcat(urlTotalSuffix, urlPreSuffix); strcat(urlTotalSuffix, "/"); } strcat(urlTotalSuffix, urlSuffix); if (authenticationOK("SETUP", urlTotalSuffix, (char const*)fRequestBuffer)) { clientSession = (RTSPServer::RTSPClientSession*)fOurRTSPServer.createNewClientSessionWithId(); } else { areAuthenticated = False; } } if (clientSession != NULL) { clientSession->handleCmd_SETUP(this, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); playAfterSetup = clientSession->fStreamAfterSETUP; } else if (areAuthenticated) { handleCmd_sessionNotFound(); } } else if (strcmp(cmdName, "TEARDOWN") == 0 || strcmp(cmdName, "PLAY") == 0 || strcmp(cmdName, "PAUSE") == 0 || strcmp(cmdName, "GET_PARAMETER") == 0 || strcmp(cmdName, "SET_PARAMETER") == 0) { if (clientSession != NULL) { clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); } else { handleCmd_sessionNotFound(); } } else if (strcmp(cmdName, "REGISTER") == 0) { // Because - unlike other commands - an implementation of this command needs // the entire URL, we re-parse the command to get it: char* url = strDupSize((char*)fRequestBuffer); if (sscanf((char*)fRequestBuffer, "%*s %s", url) == 1) { // Check for special command-specific parameters in a "Transport:" header: Boolean reuseConnection, deliverViaTCP; char* proxyURLSuffix; parseTransportHeaderForREGISTER((const char*)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix); handleCmd_REGISTER(url, urlSuffix, (char const*)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix); delete[] proxyURLSuffix; } else { handleCmd_bad(); } delete[] url; } else { // The command is one that we don't handle: handleCmd_notSupported(); } } else { #ifdef DEBUG fprintf(stderr, "parseRTSPRequestString() failed; checking now for HTTP commands (for RTSP-over-HTTP tunneling)...\n"); #endif // The request was not (valid) RTSP, but check for a special case: HTTP commands (for setting up RTSP-over-HTTP tunneling): char sessionCookie[RTSP_PARAM_STRING_MAX]; char acceptStr[RTSP_PARAM_STRING_MAX]; *fLastCRLF = '\0'; // temporarily, for parsing parseSucceeded = parseHTTPRequestString(cmdName, sizeof cmdName, urlSuffix, sizeof urlPreSuffix, sessionCookie, sizeof sessionCookie, acceptStr, sizeof acceptStr); *fLastCRLF = '\r'; if (parseSucceeded) { #ifdef DEBUG fprintf(stderr, "parseHTTPRequestString() succeeded, returning cmdName \"%s\", urlSuffix \"%s\", sessionCookie \"%s\", acceptStr \"%s\"\n", cmdName, urlSuffix, sessionCookie, acceptStr); #endif // Check that the HTTP command is valid for RTSP-over-HTTP tunneling: There must be a 'session cookie'. Boolean isValidHTTPCmd = True; if (strcmp(cmdName, "OPTIONS") == 0) { handleHTTPCmd_OPTIONS(); } else if (sessionCookie[0] == '\0') { // There was no "x-sessioncookie:" header. If there was an "Accept: application/x-rtsp-tunnelled" header, // then this is a bad tunneling request. Otherwise, assume that it's an attempt to access the stream via HTTP. if (strcmp(acceptStr, "application/x-rtsp-tunnelled") == 0) { isValidHTTPCmd = False; } else { handleHTTPCmd_StreamingGET(urlSuffix, (char const*)fRequestBuffer); } } else if (strcmp(cmdName, "GET") == 0) { handleHTTPCmd_TunnelingGET(sessionCookie); } else if (strcmp(cmdName, "POST") == 0) { // We might have received additional data following the HTTP "POST" command - i.e., the first Base64-encoded RTSP command. // Check for this, and handle it if it exists: unsigned char const* extraData = fLastCRLF+4; unsigned extraDataSize = &fRequestBuffer[fRequestBytesAlreadySeen] - extraData; if (handleHTTPCmd_TunnelingPOST(sessionCookie, extraData, extraDataSize)) { // We don't respond to the "POST" command, and we go away: fIsActive = False; break; } } else { isValidHTTPCmd = False; } if (!isValidHTTPCmd) { handleHTTPCmd_notSupported(); } } else { #ifdef DEBUG fprintf(stderr, "parseHTTPRequestString() failed!\n"); #endif handleCmd_bad(); } } #ifdef DEBUG fprintf(stderr, "sending response: %s", fResponseBuffer); #endif send(fClientOutputSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0); if (playAfterSetup) { // The client has asked for streaming to commence now, rather than after a // subsequent "PLAY" command. So, simulate the effect of a "PLAY" command: clientSession->handleCmd_withinSession(this, "PLAY", urlPreSuffix, urlSuffix, (char const*)fRequestBuffer); } // Check whether there are extra bytes remaining in the buffer, after the end of the request (a rare case). // If so, move them to the front of our buffer, and keep processing it, because it might be a following, pipelined request. unsigned requestSize = (fLastCRLF+4-fRequestBuffer) + contentLength; numBytesRemaining = fRequestBytesAlreadySeen - requestSize; resetRequestBuffer(); // to prepare for any subsequent request if (numBytesRemaining > 0) { memmove(fRequestBuffer, &fRequestBuffer[requestSize], numBytesRemaining); newBytesRead = numBytesRemaining; } } while (numBytesRemaining > 0); --fRecursionCount; if (!fIsActive) { if (fRecursionCount > 0) closeSockets(); else delete this; // Note: The "fRecursionCount" test is for a pathological situation where we reenter the event loop and get called recursively // while handling a command (e.g., while handling a "DESCRIBE", to get a SDP description). // In such a case we don't want to actually delete ourself until we leave the outermost call. } }函数比较长,不做详细解析,其命令与处理函数的关系如下图所示:
涉及到的命令比较多,这里只对setup和play命令进行处理:
void RTSPServer::RTSPClientSession ::handleCmd_SETUP(RTSPServer::RTSPClientConnection* ourClientConnection, char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr) { // Normally, "urlPreSuffix" should be the session (stream) name, and "urlSuffix" should be the subsession (track) name. // However (being "liberal in what we accept"), we also handle 'aggregate' SETUP requests (i.e., without a track name), // in the special case where we have only a single track. I.e., in this case, we also handle: // "urlPreSuffix" is empty and "urlSuffix" is the session (stream) name, or // "urlPreSuffix" concatenated with "urlSuffix" (with "/" inbetween) is the session (stream) name. char const* streamName = urlPreSuffix; // in the normal case char const* trackId = urlSuffix; // in the normal case char* concatenatedStreamName = NULL; // in the normal case do { // First, make sure the specified stream name exists: ServerMediaSession* sms = fOurServer.lookupServerMediaSession(streamName, fOurServerMediaSession == NULL); (1.0) if (sms == NULL) { (2.0) // Check for the special case (noted above), before we give up: if (urlPreSuffix[0] == '\0') { streamName = urlSuffix; } else { concatenatedStreamName = new char[strlen(urlPreSuffix) + strlen(urlSuffix) + 2]; // allow for the "/" and the trailing '\0' sprintf(concatenatedStreamName, "%s/%s", urlPreSuffix, urlSuffix); streamName = concatenatedStreamName; } trackId = NULL; // Check again: sms = fOurServer.lookupServerMediaSession(streamName, fOurServerMediaSession == NULL); } if (sms == NULL) { if (fOurServerMediaSession == NULL) { // The client asked for a stream that doesn't exist (and this session descriptor has not been used before): ourClientConnection->handleCmd_notFound(); } else { // The client asked for a stream that doesn't exist, but using a stream id for a stream that does exist. Bad request: ourClientConnection->handleCmd_bad(); } break; } else { if (fOurServerMediaSession == NULL) { // We're accessing the "ServerMediaSession" for the first time. fOurServerMediaSession = sms; fOurServerMediaSession->incrementReferenceCount(); } else if (sms != fOurServerMediaSession) { // The client asked for a stream that's different from the one originally requested for this stream id. Bad request: ourClientConnection->handleCmd_bad(); break; } } if (fStreamStates == NULL) { (3.0) // This is the first "SETUP" for this session. Set up our array of states for all of this session's subsessions (tracks): ServerMediaSubsessionIterator iter(*fOurServerMediaSession); for (fNumStreamStates = 0; iter.next() != NULL; ++fNumStreamStates) {} // begin by counting the number of subsessions (tracks) fStreamStates = new struct streamState[fNumStreamStates]; iter.reset(); ServerMediaSubsession* subsession; for (unsigned i = 0; i < fNumStreamStates; ++i) { subsession = iter.next(); fStreamStates[i].subsession = subsession; fStreamStates[i].tcpSocketNum = -1; // for now; may get set for RTP-over-TCP streaming fStreamStates[i].streamToken = NULL; // for now; it may be changed by the "getStreamParameters()" call that comes later } } // Look up information for the specified subsession (track): (4.0) ServerMediaSubsession* subsession = NULL; unsigned trackNum; if (trackId != NULL && trackId[0] != '\0') { // normal case for (trackNum = 0; trackNum < fNumStreamStates; ++trackNum) { subsession = fStreamStates[trackNum].subsession; if (subsession != NULL && strcmp(trackId, subsession->trackId()) == 0) break; } if (trackNum >= fNumStreamStates) { // The specified track id doesn't exist, so this request fails: ourClientConnection->handleCmd_notFound(); break; } } else { // Weird case: there was no track id in the URL. (5.0) // This works only if we have only one subsession: if (fNumStreamStates != 1 || fStreamStates[0].subsession == NULL) { ourClientConnection->handleCmd_bad(); break; } trackNum = 0; subsession = fStreamStates[trackNum].subsession; } // ASSERT: subsession != NULL void*& token = fStreamStates[trackNum].streamToken; // alias if (token != NULL) { // We already handled a "SETUP" for this track (to the same client), // so stop any existing streaming of it, before we set it up again: subsession->pauseStream(fOurSessionId, token); fOurRTSPServer.unnoteTCPStreamingOnSocket(fStreamStates[trackNum].tcpSocketNum, this, trackNum); subsession->deleteStream(fOurSessionId, token); } // Look for a "Transport:" header in the request string, to extract client parameters: (6.0) StreamingMode streamingMode; char* streamingModeString = NULL; // set when RAW_UDP streaming is specified char* clientsDestinationAddressStr; u_int8_t clientsDestinationTTL; portNumBits clientRTPPortNum, clientRTCPPortNum; unsigned char rtpChannelId, rtcpChannelId; parseTransportHeader(fullRequestStr, streamingMode, streamingModeString, clientsDestinationAddressStr, clientsDestinationTTL, clientRTPPortNum, clientRTCPPortNum, rtpChannelId, rtcpChannelId); if ((streamingMode == RTP_TCP && rtpChannelId == 0xFF) || (streamingMode != RTP_TCP && ourClientConnection->fClientOutputSocket != ourClientConnection->fClientInputSocket)) { // An anomolous situation, caused by a buggy client. Either: // 1/ TCP streaming was requested, but with no "interleaving=" fields. (QuickTime Player sometimes does this.), or // 2/ TCP streaming was not requested, but we're doing RTSP-over-HTTP tunneling (which implies TCP streaming). // In either case, we assume TCP streaming, and set the RTP and RTCP channel ids to proper values: streamingMode = RTP_TCP; rtpChannelId = fTCPStreamIdCount; rtcpChannelId = fTCPStreamIdCount+1; } if (streamingMode == RTP_TCP) fTCPStreamIdCount += 2; Port clientRTPPort(clientRTPPortNum); Port clientRTCPPort(clientRTCPPortNum); // Next, check whether a "Range:" or "x-playNow:" header is present in the request. (7.0) // This isn't legal, but some clients do this to combine "SETUP" and "PLAY": double rangeStart = 0.0, rangeEnd = 0.0; char* absStart = NULL; char* absEnd = NULL; Boolean startTimeIsNow; if (parseRangeHeader(fullRequestStr, rangeStart, rangeEnd, absStart, absEnd, startTimeIsNow)) { delete[] absStart; delete[] absEnd; fStreamAfterSETUP = True; } else if (parsePlayNowHeader(fullRequestStr)) { fStreamAfterSETUP = True; } else { fStreamAfterSETUP = False; } // Then, get server parameters from the 'subsession': if (streamingMode == RTP_TCP) { // Note that we'll be streaming over the RTSP TCP connection: fStreamStates[trackNum].tcpSocketNum = ourClientConnection->fClientOutputSocket; fOurRTSPServer.noteTCPStreamingOnSocket(fStreamStates[trackNum].tcpSocketNum, this, trackNum); } netAddressBits destinationAddress = 0; u_int8_t destinationTTL = 255; #ifdef RTSP_ALLOW_CLIENT_DESTINATION_SETTING if (clientsDestinationAddressStr != NULL) { // Use the client-provided "destination" address. // Note: This potentially allows the server to be used in denial-of-service // attacks, so don't enable this code unless you're sure that clients are // trusted. destinationAddress = our_inet_addr(clientsDestinationAddressStr); } // Also use the client-provided TTL. destinationTTL = clientsDestinationTTL; #endif delete[] clientsDestinationAddressStr; Port serverRTPPort(0); Port serverRTCPPort(0); // Make sure that we transmit on the same interface that's used by the client (in case we're a multi-homed server): struct sockaddr_in sourceAddr; SOCKLEN_T namelen = sizeof sourceAddr; getsockname(ourClientConnection->fClientInputSocket, (struct sockaddr*)&sourceAddr, &namelen); netAddressBits origSendingInterfaceAddr = SendingInterfaceAddr; netAddressBits origReceivingInterfaceAddr = ReceivingInterfaceAddr; // NOTE: The following might not work properly, so we ifdef it out for now: #ifdef HACK_FOR_MULTIHOMED_SERVERS ReceivingInterfaceAddr = SendingInterfaceAddr = sourceAddr.sin_addr.s_addr; #endif subsession->getStreamParameters(fOurSessionId, ourClientConnection->fClientAddr.sin_addr.s_addr, (8.0) clientRTPPort, clientRTCPPort, fStreamStates[trackNum].tcpSocketNum, rtpChannelId, rtcpChannelId, destinationAddress, destinationTTL, fIsMulticast, serverRTPPort, serverRTCPPort, fStreamStates[trackNum].streamToken); SendingInterfaceAddr = origSendingInterfaceAddr; (9.0) ReceivingInterfaceAddr = origReceivingInterfaceAddr; AddressString destAddrStr(destinationAddress); AddressString sourceAddrStr(sourceAddr); char timeoutParameterString[100]; if (fOurRTSPServer.fReclamationSeconds > 0) { sprintf(timeoutParameterString, ";timeout=%u", fOurRTSPServer.fReclamationSeconds); } else { timeoutParameterString[0] = '\0'; } if (fIsMulticast) { switch (streamingMode) { case RTP_UDP: { snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer, "RTSP/1.0 200 OK\r\n" "CSeq: %s\r\n" "%s" "Transport: RTP/AVP;multicast;destination=%s;source=%s;port=%d-%d;ttl=%d\r\n" "Session: %08X%s\r\n\r\n", ourClientConnection->fCurrentCSeq, dateHeader(), destAddrStr.val(), sourceAddrStr.val(), ntohs(serverRTPPort.num()), ntohs(serverRTCPPort.num()), destinationTTL, fOurSessionId, timeoutParameterString); break; } case RTP_TCP: { // multicast streams can't be sent via TCP ourClientConnection->handleCmd_unsupportedTransport(); break; } case RAW_UDP: { snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer, "RTSP/1.0 200 OK\r\n" "CSeq: %s\r\n" "%s" "Transport: %s;multicast;destination=%s;source=%s;port=%d;ttl=%d\r\n" "Session: %08X%s\r\n\r\n", ourClientConnection->fCurrentCSeq, dateHeader(), streamingModeString, destAddrStr.val(), sourceAddrStr.val(), ntohs(serverRTPPort.num()), destinationTTL, fOurSessionId, timeoutParameterString); break; } } } else { switch (streamingMode) { case RTP_UDP: { snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer, "RTSP/1.0 200 OK\r\n" "CSeq: %s\r\n" "%s" "Transport: RTP/AVP;unicast;destination=%s;source=%s;client_port=%d-%d;server_port=%d-%d\r\n" "Session: %08X%s\r\n\r\n", ourClientConnection->fCurrentCSeq, dateHeader(), destAddrStr.val(), sourceAddrStr.val(), ntohs(clientRTPPort.num()), ntohs(clientRTCPPort.num()), ntohs(serverRTPPort.num()), ntohs(serverRTCPPort.num()), fOurSessionId, timeoutParameterString); break; } case RTP_TCP: { if (!fOurRTSPServer.fAllowStreamingRTPOverTCP) { ourClientConnection->handleCmd_unsupportedTransport(); } else { snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer, "RTSP/1.0 200 OK\r\n" "CSeq: %s\r\n" "%s" "Transport: RTP/AVP/TCP;unicast;destination=%s;source=%s;interleaved=%d-%d\r\n" "Session: %08X%s\r\n\r\n", ourClientConnection->fCurrentCSeq, dateHeader(), destAddrStr.val(), sourceAddrStr.val(), rtpChannelId, rtcpChannelId, fOurSessionId, timeoutParameterString); } break; } case RAW_UDP: { snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer, "RTSP/1.0 200 OK\r\n" "CSeq: %s\r\n" "%s" "Transport: %s;unicast;destination=%s;source=%s;client_port=%d;server_port=%d\r\n" "Session: %08X%s\r\n\r\n", ourClientConnection->fCurrentCSeq, dateHeader(), streamingModeString, destAddrStr.val(), sourceAddrStr.val(), ntohs(clientRTPPort.num()), ntohs(serverRTPPort.num()), fOurSessionId, timeoutParameterString); break; } } } delete[] streamingModeString; } while (0); delete[] concatenatedStreamName; }(1.0)根据媒体流名称(文件名)查找相应的session,session是在DSCRIBE命令处理过程中创建。
(2.0)下面处理URL中不带track id的情况,当文件中有一个流的时,允许这种情况出现,这里流名称保存在urlSuffix变量中。
(3.0)若这是这个session所处理的第一个“SETUP”命令,需要构建一个streamState型的数组,并初始化。
(4.0)查找track id对应的subsession是否存在,不存在则进行错误处理
(5.0)例外情况:URL中不存在 track id,仅当只有一个subsession的情况下才充许出现
(6.0)处理Transport头部,获取传输相关信息
(7.0)处理Range头部
(8.0)从subsession中获取参数,fOurSessionId, 标识了一个客户端的session,是在RTSPServer::incomingConnectionHandler函数中生成的随机数,RTP和RTCP在该函数里面创建
(9.0)生成RTSP的应答信息
接下来再看看play命令的处理:
void RTSPServer::RTSPClientSession ::handleCmd_PLAY(RTSPServer::RTSPClientConnection* ourClientConnection, ServerMediaSubsession* subsession, char const* fullRequestStr) { char* rtspURL = fOurRTSPServer.rtspURL(fOurServerMediaSession, ourClientConnection->fClientInputSocket); unsigned rtspURLSize = strlen(rtspURL); // Parse the client's "Scale:" header, if any: (10.0) float scale; Boolean sawScaleHeader = parseScaleHeader(fullRequestStr, scale); // Try to set the stream's scale factor to this value: (11.0) if (subsession == NULL /*aggregate op*/) { (12.0) fOurServerMediaSession->testScaleFactor(scale); (13.0) } else { subsession->testScaleFactor(scale); } char buf[100]; char* scaleHeader; if (!sawScaleHeader) { buf[0] = '\0'; // Because we didn't see a Scale: header, don't send one back } else { sprintf(buf, "Scale: %f\r\n", scale); } scaleHeader = strDup(buf); // Parse the client's "Range:" header, if any: (14.0) float duration = 0.0; double rangeStart = 0.0, rangeEnd = 0.0; char* absStart = NULL; char* absEnd = NULL; Boolean startTimeIsNow; Boolean sawRangeHeader = parseRangeHeader(fullRequestStr, rangeStart, rangeEnd, absStart, absEnd, startTimeIsNow); if (sawRangeHeader && absStart == NULL/*not seeking by 'absolute' time*/) { // Use this information, plus the stream's duration (if known), to create our own "Range:" header, for the response: duration = subsession == NULL /*aggregate op*/ ? fOurServerMediaSession->duration() : subsession->duration(); if (duration < 0.0) { // We're an aggregate PLAY, but the subsessions have different durations. // Use the largest of these durations in our header duration = -duration; } // Make sure that "rangeStart" and "rangeEnd" (from the client's "Range:" header) // have sane values, before we send back our own "Range:" header in our response: if (rangeStart < 0.0) rangeStart = 0.0; else if (rangeStart > duration) rangeStart = duration; if (rangeEnd < 0.0) rangeEnd = 0.0; else if (rangeEnd > duration) rangeEnd = duration; if ((scale > 0.0 && rangeStart > rangeEnd && rangeEnd > 0.0) || (scale < 0.0 && rangeStart < rangeEnd)) { // "rangeStart" and "rangeEnd" were the wrong way around; swap them: double tmp = rangeStart; rangeStart = rangeEnd; rangeEnd = tmp; } } // Create a "RTP-Info:" line. It will get filled in from each subsession's state: (15.0) char const* rtpInfoFmt = "%s" // "RTP-Info:", plus any preceding rtpInfo items "%s" // comma separator, if needed "url=%s/%s" ";seq=%d" ";rtptime=%u" ; unsigned rtpInfoFmtSize = strlen(rtpInfoFmt); char* rtpInfo = strDup("RTP-Info: "); unsigned i, numRTPInfoItems = 0; // Do any required seeking/scaling on each subsession, before starting streaming. (16.0) // (However, we don't do this if the "PLAY" request was for just a single subsession // of a multiple-subsession stream; for such streams, seeking/scaling can be done // only with an aggregate "PLAY".) for (i = 0; i < fNumStreamStates; ++i) { if (subsession == NULL /* means: aggregated operation */ || fNumStreamStates == 1) { if (fStreamStates[i].subsession != NULL) { if (sawScaleHeader) { fStreamStates[i].subsession->setStreamScale(fOurSessionId, fStreamStates[i].streamToken, scale);(17.0) } if (absStart != NULL) { // Special case handling for seeking by 'absolute' time: fStreamStates[i].subsession->seekStream(fOurSessionId, fStreamStates[i].streamToken, absStart, absEnd); } else { // Seeking by relative (NPT) time: u_int64_t numBytes; if (!sawRangeHeader || startTimeIsNow) { // We're resuming streaming without seeking, so we just do a 'null' seek // (to get our NPT, and to specify when to end streaming): fStreamStates[i].subsession->nullSeekStream(fOurSessionId, fStreamStates[i].streamToken, rangeEnd, numBytes); } else { // We do a real 'seek': double streamDuration = 0.0; // by default; means: stream until the end of the media (18.0) if (rangeEnd > 0.0 && (rangeEnd+0.001) < duration) { // the 0.001 is because we limited the values to 3 decimal places // We want the stream to end early. Set the duration we want: streamDuration = rangeEnd - rangeStart; if (streamDuration < 0.0) streamDuration = -streamDuration; (19.0) // should happen only if scale < 0.0 } fStreamStates[i].subsession->seekStream(fOurSessionId, fStreamStates[i].streamToken, (20.0) rangeStart, streamDuration, numBytes); } } } } } // Create the "Range:" header that we'll send back in our response. // (Note that we do this after seeking, in case the seeking operation changed the range start time.) if (absStart != NULL) { // We're seeking by 'absolute' time: if (absEnd == NULL) { sprintf(buf, "Range: clock=%s-\r\n", absStart); } else { sprintf(buf, "Range: clock=%s-%s\r\n", absStart, absEnd); } delete[] absStart; delete[] absEnd; } else { // We're seeking by relative (NPT) time: if (!sawRangeHeader || startTimeIsNow) { // We didn't seek, so in our response, begin the range with the current NPT (normal play time): float curNPT = 0.0; for (i = 0; i < fNumStreamStates; ++i) { if (subsession == NULL /* means: aggregated operation */ || subsession == fStreamStates[i].subsession) { if (fStreamStates[i].subsession == NULL) continue; float npt = fStreamStates[i].subsession->getCurrentNPT(fStreamStates[i].streamToken); if (npt > curNPT) curNPT = npt; // Note: If this is an aggregate "PLAY" on a multi-subsession stream, // then it's conceivable that the NPTs of each subsession may differ // (if there has been a previous seek on just one subsession). // In this (unusual) case, we just return the largest NPT; I hope that turns out OK... } } rangeStart = curNPT; } if (rangeEnd == 0.0 && scale >= 0.0) { sprintf(buf, "Range: npt=%.3f-\r\n", rangeStart); } else { sprintf(buf, "Range: npt=%.3f-%.3f\r\n", rangeStart, rangeEnd); } } char* rangeHeader = strDup(buf); // Now, start streaming: for (i = 0; i < fNumStreamStates; ++i) { if (subsession == NULL /* means: aggregated operation */ || subsession == fStreamStates[i].subsession) { unsigned short rtpSeqNum = 0; unsigned rtpTimestamp = 0; if (fStreamStates[i].subsession == NULL) continue; fStreamStates[i].subsession->startStream(fOurSessionId, (21.0) fStreamStates[i].streamToken, (TaskFunc*)noteClientLiveness, this, rtpSeqNum, rtpTimestamp, RTSPServer::RTSPClientConnection::handleAlternativeRequestByte, ourClientConnection); const char *urlSuffix = fStreamStates[i].subsession->trackId(); char* prevRTPInfo = rtpInfo; unsigned rtpInfoSize = rtpInfoFmtSize + strlen(prevRTPInfo) + 1 + rtspURLSize + strlen(urlSuffix) + 5 /*max unsigned short len*/ + 10 /*max unsigned (32-bit) len*/ + 2 /*allows for trailing \r\n at final end of string*/; rtpInfo = new char[rtpInfoSize]; (22.0) sprintf(rtpInfo, rtpInfoFmt, prevRTPInfo, numRTPInfoItems++ == 0 ? "" : ",", rtspURL, urlSuffix, rtpSeqNum, rtpTimestamp ); delete[] prevRTPInfo; } } if (numRTPInfoItems == 0) { rtpInfo[0] = '\0'; } else { unsigned rtpInfoLen = strlen(rtpInfo); rtpInfo[rtpInfoLen] = '\r'; rtpInfo[rtpInfoLen+1] = '\n'; rtpInfo[rtpInfoLen+2] = '\0'; } // Fill in the response: (23.0) snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer, "RTSP/1.0 200 OK\r\n" "CSeq: %s\r\n" "%s" "%s" "%s" "Session: %08X\r\n" "%s\r\n", ourClientConnection->fCurrentCSeq, dateHeader(), scaleHeader, rangeHeader, fOurSessionId, rtpInfo); delete[] rtpInfo; delete[] rangeHeader; delete[] scaleHeader; delete[] rtspURL; }(10.0)分析"Scale:"头部 Scale头,指示了播放的速率,scale = 1为正常播放,大于1快进,小于0则表示快退
(11.0)测试scale的值是否能满足,这期间可能会改变scale的值
(12.0)聚合的情况下,subsession还不确定
(13.0)测试scale的值(注意该函数的调用)
(14.0)分析"Range:"头部 ,"Range:"头部,表示要播放的时间范围。如Range: npt=0.000-,从0时刻开始播放看到结束 不含Range 首部域的PLAY 请求也是合法的。它从媒体流开头开始播放,直到媒体流被暂停
(15.0)创建响应中的"RTP-Info:"行
(16.0)根据要求,在每个subsession上进行seeking/scaling操作
(17.0)设置subsession的scale值
(18.0)计算流的播放时间streamDuration
(19.0)这里情况下进行快退操作
(20.0)设置每个subsession上的播放时间范围
(21.0)开始各个subsession上的数据传输, 即开始播放了
(22.0)subsession中的信息添加到"RTP-Info:"行中
(23.0)组装响应包的操作
上面我们介绍了setup和play命令的处理。在上一章中我们知道了RTSP服务的运作,但是还没有涉及到RTP和RTCP,RTP和RTCP是在Setup命令处理的过程中创建的,数据的传输是在paly命令处理过程中开始的。Live555的数据流向我们在下一章中分析,接下来我们要分析的是RTP和RTCP的建立,它是在下面的这个函数中实现的。
void OnDemandServerMediaSubsession ::getStreamParameters(unsigned clientSessionId, netAddressBits clientAddress, Port const& clientRTPPort, Port const& clientRTCPPort, int tcpSocketNum, unsigned char rtpChannelId, unsigned char rtcpChannelId, netAddressBits& destinationAddress, u_int8_t& /*destinationTTL*/, Boolean& isMulticast, Port& serverRTPPort, Port& serverRTCPPort, void*& streamToken) { if (destinationAddress == 0) destinationAddress = clientAddress; struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress; isMulticast = False; if (fLastStreamToken != NULL && fReuseFirstSource) { (30.0) // Special case: Rather than creating a new 'StreamState', // we reuse the one that we've already created: serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort(); serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort(); ++((StreamState*)fLastStreamToken)->referenceCount(); (31.0) streamToken = fLastStreamToken; } else { // Normal case: Create a new media source: (32.0) unsigned streamBitrate; FramedSource* mediaSource = createNewStreamSource(clientSessionId, streamBitrate); (33.0) // Create 'groupsock' and 'sink' objects for the destination, // using previously unused server port numbers: RTPSink* rtpSink = NULL; BasicUDPSink* udpSink = NULL; Groupsock* rtpGroupsock = NULL; Groupsock* rtcpGroupsock = NULL; if (clientRTPPort.num() != 0 || tcpSocketNum >= 0) { // Normal case: Create destinations portNumBits serverPortNum; if (clientRTCPPort.num() == 0) { // We're streaming raw UDP (not RTP). Create a single groupsock: NoReuse dummy(envir()); // ensures that we skip over ports that are already in use for (serverPortNum = fInitialPortNum; ; ++serverPortNum) { struct in_addr dummyAddr; dummyAddr.s_addr = 0; serverRTPPort = serverPortNum; rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort); if (rtpGroupsock->socketNum() >= 0) break; // success } udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock); } else { (34.0) // Normal case: We're streaming RTP (over UDP or TCP). Create a pair of // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even). // (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.) NoReuse dummy(envir()); // ensures that we skip over ports that are already in use for (portNumBits serverPortNum = fInitialPortNum; ; ++serverPortNum) { struct in_addr dummyAddr; dummyAddr.s_addr = 0; serverRTPPort = serverPortNum; rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort); if (rtpGroupsock->socketNum() < 0) { delete rtpGroupsock; continue; // try again } if (fMultiplexRTCPWithRTP) { // Use the RTP 'groupsock' object for RTCP as well: serverRTCPPort = serverRTPPort; rtcpGroupsock = rtpGroupsock; } else { // Create a separate 'groupsock' object (with the next (odd) port number) for RTCP: serverRTCPPort = ++serverPortNum; rtcpGroupsock = createGroupsock(dummyAddr, serverRTCPPort); if (rtcpGroupsock->socketNum() < 0) { delete rtpGroupsock; delete rtcpGroupsock; continue; // try again } } break; // success } unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic (35.0) rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource); if (rtpSink != NULL && rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate(); } // Turn off the destinations for each groupsock. They'll get set later // (unless TCP is used instead): if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations(); if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations(); if (rtpGroupsock != NULL) { (36.0) // Try to use a big send buffer for RTP - at least 0.1 second of // specified bandwidth and at least 50 KB unsigned rtpBufSize = streamBitrate * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024; increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize); } } // Set up the state of the stream. The stream will get started later: (37.0) streamToken = fLastStreamToken = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink, streamBitrate, mediaSource, rtpGroupsock, rtcpGroupsock); } // Record these destinations as being for this client session id: (38.0) Destinations* destinations; if (tcpSocketNum < 0) { // UDP destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort); } else { // TCP destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId); } fDestinationsHashTable->Add((char const*)clientSessionId, destinations); }(30.0)当fReuseFirstSource参数为True时,不需要再创建source,sink, groupsock等实例,只需要记录客户端的地址即可
(31.0)增加引用记数
(32.0)正常情况下,创建一个新的media source
(33.0)创建source,在处理DESCRIBE命令时也创建过,那是在OnDemandServerMediaSubsession::sdpLines()函数中, 但参数clientSessionId为0。
(34.0)创建一对groupsocks实例,分别用于传输RTP、RTCP。RTP、RTCP的端口号是相邻的,并且RTP端口号为偶数。初始端口fInitialPortNum = 6970,这是OnDemandServerMediaSubsession构造函数的缺省参数
(35.0)创建RTPSink
(36.0)重新配置发送RTP 的socket缓冲区大小
(37.0)建立流的状态对像(stream token),其它包括sink、source、groupsock等的对应关系注意,live555中定义了两个StreamState结构,这里的StreamState定义为一个类。在RTSPServer中,定义了一个内部结构体StreamState,其streamToken成员指向此处的StreamState实例
(38.0)这里定义了类Destinations来保存目的地址、RTP端口、RTCP端口,并将其与对应的clientSessionId保存到哈希表 fDestinationsHashTable中,这个哈希表是定义在OnDemandServerMediaSubsession类中
到这里已经创建好了RTSP,RTP和RTCP,接下来就是数据的传输的。
相关文章推荐
- 《HTTP权威指南》——认证机制
- HTTP请求报文和HTTP响应报文
- 使用HTTP头去绕过WAF
- HTML5+规范:XMLHttpRequest(管理网络请求)
- HTML5+规范:Uploader(管理网络上传任务)
- 通信网络
- 用JS做了个手机滑动相册 作者: 夜麟 链接:http://www.imooc.com/article/9483 来源:慕课网
- 计算机网络(2)-----ARP协议
- OpenStack网络实战系列一:通过Openvswitch实践了解交换机的基本概念和操作
- HTTP Status 405 - HTTP method POST/GET is not supported by this URLyi
- Http请求和响应
- 使用http协议访问遍历访问下载网站的所有图片
- 计算机网络(1)-----网络层IP协议概述
- HTTP协议详解
- Swift---网络判断封装(可直接使用)
- openstack学习笔记二 网络设置基础
- 虚拟机Linux配置网络(桥接和共享两种方法)
- tcp/ip,http,socket mysql底层技术原理
- 三次握手相关的几个数据
- TCP netstat -az含义