您的位置:首页 > 理论基础 > 计算机网络

Live555学习笔记(三)—— RTSP命令处理及RTP,RTCP服务建立

2016-07-04 20:50 531 查看
在上一章中我们已经知道了RTSP服务运作,RTSP创建之后就会一直调用RTSPServer::RTSPClientConnection::handleRequestBytes函数查询客户端的命令(OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, GET_PARAMETER, SET_PARAMETER),如果接收到命令则进入相应的命令处理函数执行。其函数定义如下:

void RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead) {
int numBytesRemaining = 0;
++fRecursionCount;

do {
RTSPServer::RTSPClientSession* clientSession = NULL;

if (newBytesRead < 0 || (unsigned)newBytesRead >= fRequestBufferBytesLeft) {
// Either the client socket has died, or the request was too big for us.
// Terminate this connection:
#ifdef DEBUG
fprintf(stderr, "RTSPClientConnection[%p]::handleRequestBytes() read %d new bytes (of %d); terminating connection!\n", this, newBytesRead, fRequestBufferBytesLeft);
#endif
fIsActive = False;
break;
}

Boolean endOfMsg = False;
unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen];
#ifdef DEBUG
ptr[newBytesRead] = '\0';
fprintf(stderr, "RTSPClientConnection[%p]::handleRequestBytes() %s %d new bytes:%s\n",
this, numBytesRemaining > 0 ? "processing" : "read", newBytesRead, ptr);
#endif

if (fClientOutputSocket != fClientInputSocket && numBytesRemaining == 0) {
// We're doing RTSP-over-HTTP tunneling, and input commands are assumed to have been Base64-encoded.
// We therefore Base64-decode as much of this new data as we can (i.e., up to a multiple of 4 bytes).

// But first, we remove any whitespace that may be in the input data:
unsigned toIndex = 0;
for (int fromIndex = 0; fromIndex < newBytesRead; ++fromIndex) {
char c = ptr[fromIndex];
if (!(c == ' ' || c == '\t' || c == '\r' || c == '\n')) { // not 'whitespace': space,tab,CR,NL
ptr[toIndex++] = c;
}
}
newBytesRead = toIndex;

unsigned numBytesToDecode = fBase64RemainderCount + newBytesRead;
unsigned newBase64RemainderCount = numBytesToDecode%4;
numBytesToDecode -= newBase64RemainderCount;
if (numBytesToDecode > 0) {
ptr[newBytesRead] = '\0';
unsigned decodedSize;
unsigned char* decodedBytes = base64Decode((char const*)(ptr-fBase64RemainderCount), numBytesToDecode, decodedSize);
#ifdef DEBUG
fprintf(stderr, "Base64-decoded %d input bytes into %d new bytes:", numBytesToDecode, decodedSize);
for (unsigned k = 0; k < decodedSize; ++k) fprintf(stderr, "%c", decodedBytes[k]);
fprintf(stderr, "\n");
#endif

// Copy the new decoded bytes in place of the old ones (we can do this because there are fewer decoded bytes than original):
unsigned char* to = ptr-fBase64RemainderCount;
for (unsigned i = 0; i < decodedSize; ++i) *to++ = decodedBytes[i];

// Then copy any remaining (undecoded) bytes to the end:
for (unsigned j = 0; j < newBase64RemainderCount; ++j) *to++ = (ptr-fBase64RemainderCount+numBytesToDecode)[j];

newBytesRead = decodedSize - fBase64RemainderCount + newBase64RemainderCount;
// adjust to allow for the size of the new decoded data (+ remainder)
delete[] decodedBytes;
}
fBase64RemainderCount = newBase64RemainderCount;
}

unsigned char* tmpPtr = fLastCRLF + 2;
if (fBase64RemainderCount == 0) { // no more Base-64 bytes remain to be read/decoded
// Look for the end of the message: <CR><LF><CR><LF>
if (tmpPtr < fRequestBuffer) tmpPtr = fRequestBuffer;
while (tmpPtr < &ptr[newBytesRead-1]) {
if (*tmpPtr == '\r' && *(tmpPtr+1) == '\n') {
if (tmpPtr - fLastCRLF == 2) { // This is it:
endOfMsg = True;
break;
}
fLastCRLF = tmpPtr;
}
++tmpPtr;
}
}

fRequestBufferBytesLeft -= newBytesRead;
fRequestBytesAlreadySeen += newBytesRead;

if (!endOfMsg) break; // subsequent reads will be needed to complete the request

// Parse the request string into command name and 'CSeq', then handle the command:
fRequestBuffer[fRequestBytesAlreadySeen] = '\0';
char cmdName[RTSP_PARAM_STRING_MAX];
char urlPreSuffix[RTSP_PARAM_STRING_MAX];
char urlSuffix[RTSP_PARAM_STRING_MAX];
char cseq[RTSP_PARAM_STRING_MAX];
char sessionIdStr[RTSP_PARAM_STRING_MAX];
unsigned contentLength = 0;
fLastCRLF[2] = '\0'; // temporarily, for parsing
Boolean parseSucceeded = parseRTSPRequestString((char*)fRequestBuffer, fLastCRLF+2 - fRequestBuffer,
cmdName, sizeof cmdName,
urlPreSuffix, sizeof urlPreSuffix,
urlSuffix, sizeof urlSuffix,
cseq, sizeof cseq,
sessionIdStr, sizeof sessionIdStr,
contentLength);
fLastCRLF[2] = '\r'; // restore its value
Boolean playAfterSetup = False;
if (parseSucceeded) {
#ifdef DEBUG
fprintf(stderr, "parseRTSPRequestString() succeeded, returning cmdName \"%s\", urlPreSuffix \"%s\", urlSuffix \"%s\", CSeq \"%s\", Content-Length %u, with %d bytes following the message.\n", cmdName, urlPreSuffix, urlSuffix, cseq, contentLength, ptr + newBytesRead - (tmpPtr + 2));
#endif
// If there was a "Content-Length:" header, then make sure we've received all of the data that it specified:
if (ptr + newBytesRead < tmpPtr + 2 + contentLength) break; // we still need more data; subsequent reads will give it to us

// If the request included a "Session:" id, and it refers to a client session that's
// current ongoing, then use this command to indicate 'liveness' on that client session:
Boolean const requestIncludedSessionId = sessionIdStr[0] != '\0';
if (requestIncludedSessionId) {
clientSession
= (RTSPServer::RTSPClientSession*)(fOurRTSPServer.lookupClientSession(sessionIdStr));
if (clientSession != NULL) clientSession->noteLiveness();
}

// We now have a complete RTSP request.
// Handle the specified command (beginning with commands that are session-independent):
fCurrentCSeq = cseq;
if (strcmp(cmdName, "OPTIONS") == 0) {
// If the "OPTIONS" command included a "Session:" id for a session that doesn't exist,
// then treat this as an error:
if (requestIncludedSessionId && clientSession == NULL) {
handleCmd_sessionNotFound();
} else {
// Normal case:
handleCmd_OPTIONS();
}
} else if (urlPreSuffix[0] == '\0' && urlSuffix[0] == '*' && urlSuffix[1] == '\0') {
// The special "*" URL means: an operation on the entire server.  This works only for GET_PARAMETER and SET_PARAMETER:
if (strcmp(cmdName, "GET_PARAMETER") == 0) {
handleCmd_GET_PARAMETER((char const*)fRequestBuffer);
} else if (strcmp(cmdName, "SET_PARAMETER") == 0) {
handleCmd_SET_PARAMETER((char const*)fRequestBuffer);
} else {
handleCmd_notSupported();
}
} else if (strcmp(cmdName, "DESCRIBE") == 0) {
handleCmd_DESCRIBE(urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
} else if (strcmp(cmdName, "SETUP") == 0) {
Boolean areAuthenticated = True;

if (!requestIncludedSessionId) {
// No session id was present in the request.
// So create a new "RTSPClientSession" object for this request.

// But first, make sure that we're authenticated to perform this command:
char urlTotalSuffix[2*RTSP_PARAM_STRING_MAX];
// enough space for urlPreSuffix/urlSuffix'\0'
urlTotalSuffix[0] = '\0';
if (urlPreSuffix[0] != '\0') {
strcat(urlTotalSuffix, urlPreSuffix);
strcat(urlTotalSuffix, "/");
}
strcat(urlTotalSuffix, urlSuffix);
if (authenticationOK("SETUP", urlTotalSuffix, (char const*)fRequestBuffer)) {
clientSession
= (RTSPServer::RTSPClientSession*)fOurRTSPServer.createNewClientSessionWithId();
} else {
areAuthenticated = False;
}
}
if (clientSession != NULL) {
clientSession->handleCmd_SETUP(this, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
playAfterSetup = clientSession->fStreamAfterSETUP;
} else if (areAuthenticated) {
handleCmd_sessionNotFound();
}
} else if (strcmp(cmdName, "TEARDOWN") == 0
|| strcmp(cmdName, "PLAY") == 0
|| strcmp(cmdName, "PAUSE") == 0
|| strcmp(cmdName, "GET_PARAMETER") == 0
|| strcmp(cmdName, "SET_PARAMETER") == 0) {
if (clientSession != NULL) {
clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
} else {
handleCmd_sessionNotFound();
}
} else if (strcmp(cmdName, "REGISTER") == 0) {
// Because - unlike other commands - an implementation of this command needs
// the entire URL, we re-parse the command to get it:
char* url = strDupSize((char*)fRequestBuffer);
if (sscanf((char*)fRequestBuffer, "%*s %s", url) == 1) {
// Check for special command-specific parameters in a "Transport:" header:
Boolean reuseConnection, deliverViaTCP;
char* proxyURLSuffix;
parseTransportHeaderForREGISTER((const char*)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);

handleCmd_REGISTER(url, urlSuffix, (char const*)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);
delete[] proxyURLSuffix;
} else {
handleCmd_bad();
}
delete[] url;
} else {
// The command is one that we don't handle:
handleCmd_notSupported();
}
} else {
#ifdef DEBUG
fprintf(stderr, "parseRTSPRequestString() failed; checking now for HTTP commands (for RTSP-over-HTTP tunneling)...\n");
#endif
// The request was not (valid) RTSP, but check for a special case: HTTP commands (for setting up RTSP-over-HTTP tunneling):
char sessionCookie[RTSP_PARAM_STRING_MAX];
char acceptStr[RTSP_PARAM_STRING_MAX];
*fLastCRLF = '\0'; // temporarily, for parsing
parseSucceeded = parseHTTPRequestString(cmdName, sizeof cmdName,
urlSuffix, sizeof urlPreSuffix,
sessionCookie, sizeof sessionCookie,
acceptStr, sizeof acceptStr);
*fLastCRLF = '\r';
if (parseSucceeded) {
#ifdef DEBUG
fprintf(stderr, "parseHTTPRequestString() succeeded, returning cmdName \"%s\", urlSuffix \"%s\", sessionCookie \"%s\", acceptStr \"%s\"\n", cmdName, urlSuffix, sessionCookie, acceptStr);
#endif
// Check that the HTTP command is valid for RTSP-over-HTTP tunneling: There must be a 'session cookie'.
Boolean isValidHTTPCmd = True;
if (strcmp(cmdName, "OPTIONS") == 0) {
handleHTTPCmd_OPTIONS();
} else if (sessionCookie[0] == '\0') {
// There was no "x-sessioncookie:" header.  If there was an "Accept: application/x-rtsp-tunnelled" header,
// then this is a bad tunneling request.  Otherwise, assume that it's an attempt to access the stream via HTTP.
if (strcmp(acceptStr, "application/x-rtsp-tunnelled") == 0) {
isValidHTTPCmd = False;
} else {
handleHTTPCmd_StreamingGET(urlSuffix, (char const*)fRequestBuffer);
}
} else if (strcmp(cmdName, "GET") == 0) {
handleHTTPCmd_TunnelingGET(sessionCookie);
} else if (strcmp(cmdName, "POST") == 0) {
// We might have received additional data following the HTTP "POST" command - i.e., the first Base64-encoded RTSP command.
// Check for this, and handle it if it exists:
unsigned char const* extraData = fLastCRLF+4;
unsigned extraDataSize = &fRequestBuffer[fRequestBytesAlreadySeen] - extraData;
if (handleHTTPCmd_TunnelingPOST(sessionCookie, extraData, extraDataSize)) {
// We don't respond to the "POST" command, and we go away:
fIsActive = False;
break;
}
} else {
isValidHTTPCmd = False;
}
if (!isValidHTTPCmd) {
handleHTTPCmd_notSupported();
}
} else {
#ifdef DEBUG
fprintf(stderr, "parseHTTPRequestString() failed!\n");
#endif
handleCmd_bad();
}
}

#ifdef DEBUG
fprintf(stderr, "sending response: %s", fResponseBuffer);
#endif
send(fClientOutputSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0);

if (playAfterSetup) {
// The client has asked for streaming to commence now, rather than after a
// subsequent "PLAY" command.  So, simulate the effect of a "PLAY" command:
clientSession->handleCmd_withinSession(this, "PLAY", urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
}

// Check whether there are extra bytes remaining in the buffer, after the end of the request (a rare case).
// If so, move them to the front of our buffer, and keep processing it, because it might be a following, pipelined request.
unsigned requestSize = (fLastCRLF+4-fRequestBuffer) + contentLength;
numBytesRemaining = fRequestBytesAlreadySeen - requestSize;
resetRequestBuffer(); // to prepare for any subsequent request

if (numBytesRemaining > 0) {
memmove(fRequestBuffer, &fRequestBuffer[requestSize], numBytesRemaining);
newBytesRead = numBytesRemaining;
}
} while (numBytesRemaining > 0);

--fRecursionCount;
if (!fIsActive) {
if (fRecursionCount > 0) closeSockets(); else delete this;
// Note: The "fRecursionCount" test is for a pathological situation where we reenter the event loop and get called recursively
// while handling a command (e.g., while handling a "DESCRIBE", to get a SDP description).
// In such a case we don't want to actually delete ourself until we leave the outermost call.
}
}
函数比较长,不做详细解析,其命令与处理函数的关系如下图所示:



涉及到的命令比较多,这里只对setup和play命令进行处理:

void RTSPServer::RTSPClientSession
::handleCmd_SETUP(RTSPServer::RTSPClientConnection* ourClientConnection,
char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr) {
// Normally, "urlPreSuffix" should be the session (stream) name, and "urlSuffix" should be the subsession (track) name.
// However (being "liberal in what we accept"), we also handle 'aggregate' SETUP requests (i.e., without a track name),
// in the special case where we have only a single track.  I.e., in this case, we also handle:
//    "urlPreSuffix" is empty and "urlSuffix" is the session (stream) name, or
//    "urlPreSuffix" concatenated with "urlSuffix" (with "/" inbetween) is the session (stream) name.
char const* streamName = urlPreSuffix; // in the normal case
char const* trackId = urlSuffix; // in the normal case
char* concatenatedStreamName = NULL; // in the normal case

do {
// First, make sure the specified stream name exists:
ServerMediaSession* sms
= fOurServer.lookupServerMediaSession(streamName, fOurServerMediaSession == NULL);                  (1.0)
if (sms == NULL) {                                                                                    (2.0)
// Check for the special case (noted above), before we give up:
if (urlPreSuffix[0] == '\0') {
streamName = urlSuffix;
} else {
concatenatedStreamName = new char[strlen(urlPreSuffix) + strlen(urlSuffix) + 2]; // allow for the "/" and the trailing '\0'
sprintf(concatenatedStreamName, "%s/%s", urlPreSuffix, urlSuffix);
streamName = concatenatedStreamName;
}
trackId = NULL;

// Check again:
sms = fOurServer.lookupServerMediaSession(streamName, fOurServerMediaSession == NULL);
}
if (sms == NULL) {
if (fOurServerMediaSession == NULL) {
// The client asked for a stream that doesn't exist (and this session descriptor has not been used before):
ourClientConnection->handleCmd_notFound();
} else {
// The client asked for a stream that doesn't exist, but using a stream id for a stream that does exist. Bad request:
ourClientConnection->handleCmd_bad();
}
break;
} else {
if (fOurServerMediaSession == NULL) {
// We're accessing the "ServerMediaSession" for the first time.
fOurServerMediaSession = sms;
fOurServerMediaSession->incrementReferenceCount();
} else if (sms != fOurServerMediaSession) {
// The client asked for a stream that's different from the one originally requested for this stream id.  Bad request:
ourClientConnection->handleCmd_bad();
break;
}
}

if (fStreamStates == NULL) {                                                                         (3.0)
// This is the first "SETUP" for this session.  Set up our array of states for all of this session's subsessions (tracks):
ServerMediaSubsessionIterator iter(*fOurServerMediaSession);
for (fNumStreamStates = 0; iter.next() != NULL; ++fNumStreamStates) {} // begin by counting the number of subsessions (tracks)

fStreamStates = new struct streamState[fNumStreamStates];

iter.reset();
ServerMediaSubsession* subsession;
for (unsigned i = 0; i < fNumStreamStates; ++i) {
subsession = iter.next();
fStreamStates[i].subsession = subsession;
fStreamStates[i].tcpSocketNum = -1; // for now; may get set for RTP-over-TCP streaming
fStreamStates[i].streamToken = NULL; // for now; it may be changed by the "getStreamParameters()" call that comes later
}
}

// Look up information for the specified subsession (track):                                       (4.0)
ServerMediaSubsession* subsession = NULL;
unsigned trackNum;
if (trackId != NULL && trackId[0] != '\0') { // normal case
for (trackNum = 0; trackNum < fNumStreamStates; ++trackNum) {
subsession = fStreamStates[trackNum].subsession;
if (subsession != NULL && strcmp(trackId, subsession->trackId()) == 0) break;
}
if (trackNum >= fNumStreamStates) {
// The specified track id doesn't exist, so this request fails:
ourClientConnection->handleCmd_notFound();
break;
}
} else {
// Weird case: there was no track id in the URL.                                                  (5.0)
// This works only if we have only one subsession:
if (fNumStreamStates != 1 || fStreamStates[0].subsession == NULL) {
ourClientConnection->handleCmd_bad();
break;
}
trackNum = 0;
subsession = fStreamStates[trackNum].subsession;
}
// ASSERT: subsession != NULL

void*& token = fStreamStates[trackNum].streamToken; // alias
if (token != NULL) {
// We already handled a "SETUP" for this track (to the same client),
// so stop any existing streaming of it, before we set it up again:
subsession->pauseStream(fOurSessionId, token);
fOurRTSPServer.unnoteTCPStreamingOnSocket(fStreamStates[trackNum].tcpSocketNum, this, trackNum);
subsession->deleteStream(fOurSessionId, token);
}

// Look for a "Transport:" header in the request string, to extract client parameters:              (6.0)
StreamingMode streamingMode;
char* streamingModeString = NULL; // set when RAW_UDP streaming is specified
char* clientsDestinationAddressStr;
u_int8_t clientsDestinationTTL;
portNumBits clientRTPPortNum, clientRTCPPortNum;
unsigned char rtpChannelId, rtcpChannelId;
parseTransportHeader(fullRequestStr, streamingMode, streamingModeString,
clientsDestinationAddressStr, clientsDestinationTTL,
clientRTPPortNum, clientRTCPPortNum,
rtpChannelId, rtcpChannelId);
if ((streamingMode == RTP_TCP && rtpChannelId == 0xFF) ||
(streamingMode != RTP_TCP && ourClientConnection->fClientOutputSocket != ourClientConnection->fClientInputSocket)) {
// An anomolous situation, caused by a buggy client.  Either:
//     1/ TCP streaming was requested, but with no "interleaving=" fields.  (QuickTime Player sometimes does this.), or
//     2/ TCP streaming was not requested, but we're doing RTSP-over-HTTP tunneling (which implies TCP streaming).
// In either case, we assume TCP streaming, and set the RTP and RTCP channel ids to proper values:
streamingMode = RTP_TCP;
rtpChannelId = fTCPStreamIdCount; rtcpChannelId = fTCPStreamIdCount+1;
}
if (streamingMode == RTP_TCP) fTCPStreamIdCount += 2;

Port clientRTPPort(clientRTPPortNum);
Port clientRTCPPort(clientRTCPPortNum);

// Next, check whether a "Range:" or "x-playNow:" header is present in the request.                 (7.0)
// This isn't legal, but some clients do this to combine "SETUP" and "PLAY":
double rangeStart = 0.0, rangeEnd = 0.0;
char* absStart = NULL; char* absEnd = NULL;
Boolean startTimeIsNow;
if (parseRangeHeader(fullRequestStr, rangeStart, rangeEnd, absStart, absEnd, startTimeIsNow)) {
delete[] absStart; delete[] absEnd;
fStreamAfterSETUP = True;
} else if (parsePlayNowHeader(fullRequestStr)) {
fStreamAfterSETUP = True;
} else {
fStreamAfterSETUP = False;
}

// Then, get server parameters from the 'subsession':
if (streamingMode == RTP_TCP) {
// Note that we'll be streaming over the RTSP TCP connection:
fStreamStates[trackNum].tcpSocketNum = ourClientConnection->fClientOutputSocket;
fOurRTSPServer.noteTCPStreamingOnSocket(fStreamStates[trackNum].tcpSocketNum, this, trackNum);
}
netAddressBits destinationAddress = 0;
u_int8_t destinationTTL = 255;
#ifdef RTSP_ALLOW_CLIENT_DESTINATION_SETTING
if (clientsDestinationAddressStr != NULL) {
// Use the client-provided "destination" address.
// Note: This potentially allows the server to be used in denial-of-service
// attacks, so don't enable this code unless you're sure that clients are
// trusted.
destinationAddress = our_inet_addr(clientsDestinationAddressStr);
}
// Also use the client-provided TTL.
destinationTTL = clientsDestinationTTL;
#endif
delete[] clientsDestinationAddressStr;
Port serverRTPPort(0);
Port serverRTCPPort(0);

// Make sure that we transmit on the same interface that's used by the client (in case we're a multi-homed server):
struct sockaddr_in sourceAddr; SOCKLEN_T namelen = sizeof sourceAddr;
getsockname(ourClientConnection->fClientInputSocket, (struct sockaddr*)&sourceAddr, &namelen);
netAddressBits origSendingInterfaceAddr = SendingInterfaceAddr;
netAddressBits origReceivingInterfaceAddr = ReceivingInterfaceAddr;
// NOTE: The following might not work properly, so we ifdef it out for now:
#ifdef HACK_FOR_MULTIHOMED_SERVERS
ReceivingInterfaceAddr = SendingInterfaceAddr = sourceAddr.sin_addr.s_addr;
#endif

subsession->getStreamParameters(fOurSessionId, ourClientConnection->fClientAddr.sin_addr.s_addr,   (8.0)
clientRTPPort, clientRTCPPort,
fStreamStates[trackNum].tcpSocketNum, rtpChannelId, rtcpChannelId,
destinationAddress, destinationTTL, fIsMulticast,
serverRTPPort, serverRTCPPort,
fStreamStates[trackNum].streamToken);
SendingInterfaceAddr = origSendingInterfaceAddr;                                                   (9.0)
ReceivingInterfaceAddr = origReceivingInterfaceAddr;

AddressString destAddrStr(destinationAddress);
AddressString sourceAddrStr(sourceAddr);
char timeoutParameterString[100];
if (fOurRTSPServer.fReclamationSeconds > 0) {
sprintf(timeoutParameterString, ";timeout=%u", fOurRTSPServer.fReclamationSeconds);
} else {
timeoutParameterString[0] = '\0';
}
if (fIsMulticast) {
switch (streamingMode) {
case RTP_UDP: {
snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer,
"RTSP/1.0 200 OK\r\n"
"CSeq: %s\r\n"
"%s"
"Transport: RTP/AVP;multicast;destination=%s;source=%s;port=%d-%d;ttl=%d\r\n"
"Session: %08X%s\r\n\r\n",
ourClientConnection->fCurrentCSeq,
dateHeader(),
destAddrStr.val(), sourceAddrStr.val(), ntohs(serverRTPPort.num()), ntohs(serverRTCPPort.num()), destinationTTL,
fOurSessionId, timeoutParameterString);
break;
}
case RTP_TCP: {
// multicast streams can't be sent via TCP
ourClientConnection->handleCmd_unsupportedTransport();
break;
}
case RAW_UDP: {
snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer,
"RTSP/1.0 200 OK\r\n"
"CSeq: %s\r\n"
"%s"
"Transport: %s;multicast;destination=%s;source=%s;port=%d;ttl=%d\r\n"
"Session: %08X%s\r\n\r\n",
ourClientConnection->fCurrentCSeq,
dateHeader(),
streamingModeString, destAddrStr.val(), sourceAddrStr.val(), ntohs(serverRTPPort.num()), destinationTTL,
fOurSessionId, timeoutParameterString);
break;
}
}
} else {
switch (streamingMode) {
case RTP_UDP: {
snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer,
"RTSP/1.0 200 OK\r\n"
"CSeq: %s\r\n"
"%s"
"Transport: RTP/AVP;unicast;destination=%s;source=%s;client_port=%d-%d;server_port=%d-%d\r\n"
"Session: %08X%s\r\n\r\n",
ourClientConnection->fCurrentCSeq,
dateHeader(),
destAddrStr.val(), sourceAddrStr.val(), ntohs(clientRTPPort.num()), ntohs(clientRTCPPort.num()), ntohs(serverRTPPort.num()), ntohs(serverRTCPPort.num()),
fOurSessionId, timeoutParameterString);
break;
}
case RTP_TCP: {
if (!fOurRTSPServer.fAllowStreamingRTPOverTCP) {
ourClientConnection->handleCmd_unsupportedTransport();
} else {
snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer,
"RTSP/1.0 200 OK\r\n"
"CSeq: %s\r\n"
"%s"
"Transport: RTP/AVP/TCP;unicast;destination=%s;source=%s;interleaved=%d-%d\r\n"
"Session: %08X%s\r\n\r\n",
ourClientConnection->fCurrentCSeq,
dateHeader(),
destAddrStr.val(), sourceAddrStr.val(), rtpChannelId, rtcpChannelId,
fOurSessionId, timeoutParameterString);
}
break;
}
case RAW_UDP: {
snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer,
"RTSP/1.0 200 OK\r\n"
"CSeq: %s\r\n"
"%s"
"Transport: %s;unicast;destination=%s;source=%s;client_port=%d;server_port=%d\r\n"
"Session: %08X%s\r\n\r\n",
ourClientConnection->fCurrentCSeq,
dateHeader(),
streamingModeString, destAddrStr.val(), sourceAddrStr.val(), ntohs(clientRTPPort.num()), ntohs(serverRTPPort.num()),
fOurSessionId, timeoutParameterString);
break;
}
}
}
delete[] streamingModeString;
} while (0);

delete[] concatenatedStreamName;
}
(1.0)根据媒体流名称(文件名)查找相应的session,session是在DSCRIBE命令处理过程中创建。

(2.0)下面处理URL中不带track id的情况,当文件中有一个流的时,允许这种情况出现,这里流名称保存在urlSuffix变量中。

(3.0)若这是这个session所处理的第一个“SETUP”命令,需要构建一个streamState型的数组,并初始化。

(4.0)查找track id对应的subsession是否存在,不存在则进行错误处理

(5.0)例外情况:URL中不存在 track id,仅当只有一个subsession的情况下才充许出现

(6.0)处理Transport头部,获取传输相关信息

(7.0)处理Range头部

(8.0)从subsession中获取参数,fOurSessionId, 标识了一个客户端的session,是在RTSPServer::incomingConnectionHandler函数中生成的随机数,RTP和RTCP在该函数里面创建

(9.0)生成RTSP的应答信息

接下来再看看play命令的处理:

void RTSPServer::RTSPClientSession
::handleCmd_PLAY(RTSPServer::RTSPClientConnection* ourClientConnection,
ServerMediaSubsession* subsession, char const* fullRequestStr) {
char* rtspURL
= fOurRTSPServer.rtspURL(fOurServerMediaSession, ourClientConnection->fClientInputSocket);
unsigned rtspURLSize = strlen(rtspURL);

// Parse the client's "Scale:" header, if any:                                                      (10.0)
float scale;
Boolean sawScaleHeader = parseScaleHeader(fullRequestStr, scale);

// Try to set the stream's scale factor to this value:                                              (11.0)
if (subsession == NULL /*aggregate op*/) {                                                          (12.0)
fOurServerMediaSession->testScaleFactor(scale);                                                   (13.0)
} else {
subsession->testScaleFactor(scale);
}

char buf[100];
char* scaleHeader;
if (!sawScaleHeader) {
buf[0] = '\0'; // Because we didn't see a Scale: header, don't send one back
} else {
sprintf(buf, "Scale: %f\r\n", scale);
}
scaleHeader = strDup(buf);

// Parse the client's "Range:" header, if any:                                                      (14.0)
float duration = 0.0;
double rangeStart = 0.0, rangeEnd = 0.0;
char* absStart = NULL; char* absEnd = NULL;
Boolean startTimeIsNow;
Boolean sawRangeHeader
= parseRangeHeader(fullRequestStr, rangeStart, rangeEnd, absStart, absEnd, startTimeIsNow);

if (sawRangeHeader && absStart == NULL/*not seeking by 'absolute' time*/) {
// Use this information, plus the stream's duration (if known), to create our own "Range:" header, for the response:
duration = subsession == NULL /*aggregate op*/
? fOurServerMediaSession->duration() : subsession->duration();
if (duration < 0.0) {
// We're an aggregate PLAY, but the subsessions have different durations.
// Use the largest of these durations in our header
duration = -duration;
}

// Make sure that "rangeStart" and "rangeEnd" (from the client's "Range:" header)
// have sane values, before we send back our own "Range:" header in our response:
if (rangeStart < 0.0) rangeStart = 0.0;
else if (rangeStart > duration) rangeStart = duration;
if (rangeEnd < 0.0) rangeEnd = 0.0;
else if (rangeEnd > duration) rangeEnd = duration;
if ((scale > 0.0 && rangeStart > rangeEnd && rangeEnd > 0.0) ||
(scale < 0.0 && rangeStart < rangeEnd)) {
// "rangeStart" and "rangeEnd" were the wrong way around; swap them:
double tmp = rangeStart;
rangeStart = rangeEnd;
rangeEnd = tmp;
}
}

// Create a "RTP-Info:" line.  It will get filled in from each subsession's state:                    (15.0)
char const* rtpInfoFmt =
"%s" // "RTP-Info:", plus any preceding rtpInfo items
"%s" // comma separator, if needed
"url=%s/%s"
";seq=%d"
";rtptime=%u"
;
unsigned rtpInfoFmtSize = strlen(rtpInfoFmt);
char* rtpInfo = strDup("RTP-Info: ");
unsigned i, numRTPInfoItems = 0;

// Do any required seeking/scaling on each subsession, before starting streaming.                      (16.0)
// (However, we don't do this if the "PLAY" request was for just a single subsession
// of a multiple-subsession stream; for such streams, seeking/scaling can be done
// only with an aggregate "PLAY".)
for (i = 0; i < fNumStreamStates; ++i) {
if (subsession == NULL /* means: aggregated operation */ || fNumStreamStates == 1) {
if (fStreamStates[i].subsession != NULL) {
if (sawScaleHeader) {
fStreamStates[i].subsession->setStreamScale(fOurSessionId, fStreamStates[i].streamToken, scale);(17.0)
}
if (absStart != NULL) {
// Special case handling for seeking by 'absolute' time:

fStreamStates[i].subsession->seekStream(fOurSessionId, fStreamStates[i].streamToken, absStart, absEnd);
} else {
// Seeking by relative (NPT) time:

u_int64_t numBytes;
if (!sawRangeHeader || startTimeIsNow) {
// We're resuming streaming without seeking, so we just do a 'null' seek
// (to get our NPT, and to specify when to end streaming):
fStreamStates[i].subsession->nullSeekStream(fOurSessionId, fStreamStates[i].streamToken,
rangeEnd, numBytes);
} else {
// We do a real 'seek':
double streamDuration = 0.0; // by default; means: stream until the end of the media           (18.0)
if (rangeEnd > 0.0 && (rangeEnd+0.001) < duration) {
// the 0.001 is because we limited the values to 3 decimal places
// We want the stream to end early.  Set the duration we want:
streamDuration = rangeEnd - rangeStart;
if (streamDuration < 0.0) streamDuration = -streamDuration;                                   (19.0)
// should happen only if scale < 0.0
}
fStreamStates[i].subsession->seekStream(fOurSessionId, fStreamStates[i].streamToken,            (20.0)
rangeStart, streamDuration, numBytes);
}
}
}
}
}

// Create the "Range:" header that we'll send back in our response.
// (Note that we do this after seeking, in case the seeking operation changed the range start time.)
if (absStart != NULL) {
// We're seeking by 'absolute' time:
if (absEnd == NULL) {
sprintf(buf, "Range: clock=%s-\r\n", absStart);
} else {
sprintf(buf, "Range: clock=%s-%s\r\n", absStart, absEnd);
}
delete[] absStart; delete[] absEnd;
} else {
// We're seeking by relative (NPT) time:
if (!sawRangeHeader || startTimeIsNow) {
// We didn't seek, so in our response, begin the range with the current NPT (normal play time):
float curNPT = 0.0;
for (i = 0; i < fNumStreamStates; ++i) {
if (subsession == NULL /* means: aggregated operation */
|| subsession == fStreamStates[i].subsession) {
if (fStreamStates[i].subsession == NULL) continue;
float npt = fStreamStates[i].subsession->getCurrentNPT(fStreamStates[i].streamToken);
if (npt > curNPT) curNPT = npt;
// Note: If this is an aggregate "PLAY" on a multi-subsession stream,
// then it's conceivable that the NPTs of each subsession may differ
// (if there has been a previous seek on just one subsession).
// In this (unusual) case, we just return the largest NPT; I hope that turns out OK...
}
}
rangeStart = curNPT;
}

if (rangeEnd == 0.0 && scale >= 0.0) {
sprintf(buf, "Range: npt=%.3f-\r\n", rangeStart);
} else {
sprintf(buf, "Range: npt=%.3f-%.3f\r\n", rangeStart, rangeEnd);
}
}
char* rangeHeader = strDup(buf);

// Now, start streaming:
for (i = 0; i < fNumStreamStates; ++i) {
if (subsession == NULL /* means: aggregated operation */
|| subsession == fStreamStates[i].subsession) {
unsigned short rtpSeqNum = 0;
unsigned rtpTimestamp = 0;
if (fStreamStates[i].subsession == NULL) continue;
fStreamStates[i].subsession->startStream(fOurSessionId,                                          (21.0)
fStreamStates[i].streamToken,
(TaskFunc*)noteClientLiveness, this,
rtpSeqNum, rtpTimestamp,
RTSPServer::RTSPClientConnection::handleAlternativeRequestByte, ourClientConnection);
const char *urlSuffix = fStreamStates[i].subsession->trackId();
char* prevRTPInfo = rtpInfo;
unsigned rtpInfoSize = rtpInfoFmtSize
+ strlen(prevRTPInfo)
+ 1
+ rtspURLSize + strlen(urlSuffix)
+ 5 /*max unsigned short len*/
+ 10 /*max unsigned (32-bit) len*/
+ 2 /*allows for trailing \r\n at final end of string*/;
rtpInfo = new char[rtpInfoSize];                                                                 (22.0)
sprintf(rtpInfo, rtpInfoFmt,
prevRTPInfo,
numRTPInfoItems++ == 0 ? "" : ",",
rtspURL, urlSuffix,
rtpSeqNum,
rtpTimestamp
);
delete[] prevRTPInfo;
}
}
if (numRTPInfoItems == 0) {
rtpInfo[0] = '\0';
} else {
unsigned rtpInfoLen = strlen(rtpInfo);
rtpInfo[rtpInfoLen] = '\r';
rtpInfo[rtpInfoLen+1] = '\n';
rtpInfo[rtpInfoLen+2] = '\0';
}

// Fill in the response:                                                                           (23.0)
snprintf((char*)ourClientConnection->fResponseBuffer, sizeof ourClientConnection->fResponseBuffer,
"RTSP/1.0 200 OK\r\n"
"CSeq: %s\r\n"
"%s"
"%s"
"%s"
"Session: %08X\r\n"
"%s\r\n",
ourClientConnection->fCurrentCSeq,
dateHeader(),
scaleHeader,
rangeHeader,
fOurSessionId,
rtpInfo);
delete[] rtpInfo; delete[] rangeHeader;
delete[] scaleHeader; delete[] rtspURL;
}
(10.0)分析"Scale:"头部    Scale头,指示了播放的速率,scale = 1为正常播放,大于1快进,小于0则表示快退

(11.0)测试scale的值是否能满足,这期间可能会改变scale的值

(12.0)聚合的情况下,subsession还不确定

(13.0)测试scale的值(注意该函数的调用)

(14.0)分析"Range:"头部 ,"Range:"头部,表示要播放的时间范围。如Range: npt=0.000-,从0时刻开始播放看到结束 不含Range 首部域的PLAY 请求也是合法的。它从媒体流开头开始播放,直到媒体流被暂停

(15.0)创建响应中的"RTP-Info:"行

(16.0)根据要求,在每个subsession上进行seeking/scaling操作

(17.0)设置subsession的scale值

(18.0)计算流的播放时间streamDuration  

(19.0)这里情况下进行快退操作

(20.0)设置每个subsession上的播放时间范围

(21.0)开始各个subsession上的数据传输, 即开始播放了

(22.0)subsession中的信息添加到"RTP-Info:"行中

(23.0)组装响应包的操作

    上面我们介绍了setup和play命令的处理。在上一章中我们知道了RTSP服务的运作,但是还没有涉及到RTP和RTCP,RTP和RTCP是在Setup命令处理的过程中创建的,数据的传输是在paly命令处理过程中开始的。Live555的数据流向我们在下一章中分析,接下来我们要分析的是RTP和RTCP的建立,它是在下面的这个函数中实现的。

void OnDemandServerMediaSubsession
::getStreamParameters(unsigned clientSessionId,
netAddressBits clientAddress,
Port const& clientRTPPort,
Port const& clientRTCPPort,
int tcpSocketNum,
unsigned char rtpChannelId,
unsigned char rtcpChannelId,
netAddressBits& destinationAddress,
u_int8_t& /*destinationTTL*/,
Boolean& isMulticast,
Port& serverRTPPort,
Port& serverRTCPPort,
void*& streamToken) {
if (destinationAddress == 0) destinationAddress = clientAddress;
struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
isMulticast = False;

if (fLastStreamToken != NULL && fReuseFirstSource) {                                                 (30.0)
// Special case: Rather than creating a new 'StreamState',
// we reuse the one that we've already created:
serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
++((StreamState*)fLastStreamToken)->referenceCount();                                              (31.0)
streamToken = fLastStreamToken;
} else {
// Normal case: Create a new media source:                                                         (32.0)
unsigned streamBitrate;
FramedSource* mediaSource
= createNewStreamSource(clientSessionId, streamBitrate);                                         (33.0)

// Create 'groupsock' and 'sink' objects for the destination,
// using previously unused server port numbers:
RTPSink* rtpSink = NULL;
BasicUDPSink* udpSink = NULL;
Groupsock* rtpGroupsock = NULL;
Groupsock* rtcpGroupsock = NULL;

if (clientRTPPort.num() != 0 || tcpSocketNum >= 0) { // Normal case: Create destinations
portNumBits serverPortNum;
if (clientRTCPPort.num() == 0) {
// We're streaming raw UDP (not RTP). Create a single groupsock:
NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
struct in_addr dummyAddr; dummyAddr.s_addr = 0;

serverRTPPort = serverPortNum;
rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort);
if (rtpGroupsock->socketNum() >= 0) break; // success
}

udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
} else {                                                                                       (34.0)
// Normal case: We're streaming RTP (over UDP or TCP).  Create a pair of
// groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even).
// (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.)
NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
for (portNumBits serverPortNum = fInitialPortNum; ; ++serverPortNum) {
struct in_addr dummyAddr; dummyAddr.s_addr = 0;

serverRTPPort = serverPortNum;
rtpGroupsock = createGroupsock(dummyAddr, serverRTPPort);
if (rtpGroupsock->socketNum() < 0) {
delete rtpGroupsock;
continue; // try again
}

if (fMultiplexRTCPWithRTP) {
// Use the RTP 'groupsock' object for RTCP as well:
serverRTCPPort = serverRTPPort;
rtcpGroupsock = rtpGroupsock;
} else {
// Create a separate 'groupsock' object (with the next (odd) port number) for RTCP:
serverRTCPPort = ++serverPortNum;
rtcpGroupsock = createGroupsock(dummyAddr, serverRTCPPort);
if (rtcpGroupsock->socketNum() < 0) {
delete rtpGroupsock;
delete rtcpGroupsock;
continue; // try again
}
}

break; // success
}

unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic                            (35.0)
rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
if (rtpSink != NULL && rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate();
}

// Turn off the destinations for each groupsock.  They'll get set later
// (unless TCP is used instead):
if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();

if (rtpGroupsock != NULL) {                                                                     (36.0)
// Try to use a big send buffer for RTP -  at least 0.1 second of
// specified bandwidth and at least 50 KB
unsigned rtpBufSize = streamBitrate * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes
if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;
increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);
}
}

// Set up the state of the stream.  The stream will get started later:                            (37.0)
streamToken = fLastStreamToken
= new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
streamBitrate, mediaSource,
rtpGroupsock, rtcpGroupsock);
}

// Record these destinations as being for this client session id:                                   (38.0)
Destinations* destinations;
if (tcpSocketNum < 0) { // UDP
destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
} else { // TCP
destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
}
fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
}
(30.0)当fReuseFirstSource参数为True时,不需要再创建source,sink, groupsock等实例,只需要记录客户端的地址即可

(31.0)增加引用记数

(32.0)正常情况下,创建一个新的media source

(33.0)创建source,在处理DESCRIBE命令时也创建过,那是在OnDemandServerMediaSubsession::sdpLines()函数中, 但参数clientSessionId为0。

(34.0)创建一对groupsocks实例,分别用于传输RTP、RTCP。RTP、RTCP的端口号是相邻的,并且RTP端口号为偶数。初始端口fInitialPortNum = 6970,这是OnDemandServerMediaSubsession构造函数的缺省参数

(35.0)创建RTPSink

(36.0)重新配置发送RTP 的socket缓冲区大小

(37.0)建立流的状态对像(stream token),其它包括sink、source、groupsock等的对应关系注意,live555中定义了两个StreamState结构,这里的StreamState定义为一个类。在RTSPServer中,定义了一个内部结构体StreamState,其streamToken成员指向此处的StreamState实例

(38.0)这里定义了类Destinations来保存目的地址、RTP端口、RTCP端口,并将其与对应的clientSessionId保存到哈希表 fDestinationsHashTable中,这个哈希表是定义在OnDemandServerMediaSubsession类中
    到这里已经创建好了RTSP,RTP和RTCP,接下来就是数据的传输的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: