您的位置:首页 > 运维架构 > 网站架构

crtmpserver基本架构(协议和服务对应关系)

2015-09-28 11:11 615 查看
<pre name="code" class="html">string ConfigFile::GetServicesInfo() {
map<uint32_t, BaseClientApplication *> applications = ClientApplicationManager::GetAllApplications();

stringstream ss;

ss << "+-----------------------------------------------------------------------------+" << endl;
ss << "|";
ss.width(77);
ss << "Services";
ss << "|" << endl;
ss << "+---+---------------+-----+-------------------------+-------------------------+" << endl;
ss << "| c | ip | port| protocol stack name | application name |" << endl;

FOR_MAP(applications, uint32_t, BaseClientApplication *, i) {
ss << MAP_VAL(i)->GetServicesInfo();
}

ss << "+---+---------------+-----+-------------------------+-------------------------+";

return ss.str();
}


string BaseClientApplication::GetServicesInfo() {map<uint32_t, IOHandler *> handlers = IOHandlerManager::GetActiveHandlers();string result = "";FOR_MAP(handlers, uint32_t, IOHandler *, i) {result += GetServiceInfo(MAP_VAL(i));}return result;}


string BaseClientApplication::GetServiceInfo(IOHandler *pIOHandler) {
if ((pIOHandler->GetType() != IOHT_ACCEPTOR)
&& (pIOHandler->GetType() != IOHT_UDP_CARRIER))
return "";
if (pIOHandler->GetType() == IOHT_ACCEPTOR) {
if ((((TCPAcceptor *) pIOHandler)->GetApplication() == NULL)
|| (((TCPAcceptor *) pIOHandler)->GetApplication()->GetId() != GetId())) {
return "";
}
} else {
if ((((UDPCarrier *) pIOHandler)->GetProtocol() == NULL)
|| (((UDPCarrier *) pIOHandler)->GetProtocol()->GetNearEndpoint()->GetApplication() == NULL)
|| (((UDPCarrier *) pIOHandler)->GetProtocol()->GetNearEndpoint()->GetApplication()->GetId() != GetId())) {
return "";
}
}
Variant ¶ms = pIOHandler->GetType() == IOHT_ACCEPTOR ?
((TCPAcceptor *) pIOHandler)->GetParameters()
: ((UDPCarrier *) pIOHandler)->GetParameters();
if (params != V_MAP)
return "";
stringstream ss;
ss << "+---+---------------+-----+-------------------------+-------------------------+" << endl;
ss << "|";
ss.width(3);
ss << (pIOHandler->GetType() == IOHT_ACCEPTOR ? "tcp" : "udp");
ss << "|";

ss.width(3 * 4 + 3);
ss << (string) params[CONF_IP];
ss << "|";

ss.width(5);
ss << (uint16_t) params[CONF_PORT];
ss << "|";

ss.width(25);
ss << (string) params[CONF_PROTOCOL];
ss << "|";

ss.width(25);
ss << GetName();
ss << "|";

ss << endl;

return ss.str();
}


下面是一段日志打印

+-----------------------------------------------------------------------------+  
|                                                                     Services|  
+---+---------------+-----+-------------------------+-------------------------+  
| c |      ip       | port|   protocol stack name   |     application name    |  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 1112|           inboundJsonCli|                    admin|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 1935|              inboundRtmp|              appselector|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 8081|             inboundRtmps|              appselector|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 8080|             inboundRtmpt|              appselector|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 6666|           inboundLiveFlv|              flvplayback|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 9999|             inboundTcpTs|              flvplayback|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 5544|              inboundRtsp|              flvplayback|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 6665|           inboundLiveFlv|             proxypublish|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 8989|         httpEchoProtocol|            samplefactory|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 8988|             echoProtocol|            samplefactory|  
+---+---------------+-----+-------------------------+-------------------------+  
|tcp|        0.0.0.0| 1111|    inboundHttpXmlVariant|                  vptests|  
+---+---------------+-----+-------------------------+-------------------------
//4. Create the chain
BaseProtocol *pProtocol = ProtocolFactoryManager::CreateProtocolChain(
_protocolChain, _parameters);
if (pProtocol == NULL) {
FATAL("Unable to create protocol chain");
CLOSE_SOCKET(fd);
return false;
}
bool TCPCarrier::OnEvent(select_event &event) {
int32_t readAmount = 0;
int32_t writeAmount = 0;

//3. Do the I/O
switch (event.type) {
case SET_READ:
{
IOBuffer *pInputBuffer = _pProtocol->GetInputBuffer();
assert(pInputBuffer != NULL);
if (!pInputBuffer->ReadFromTCPFd(_inboundFd,
_recvBufferSize, readAmount)) {
FATAL("Unable to read data. %s:%hu -> %s:%hu",
STR(_farIp), _farPort,
STR(_nearIp), _nearPort);
return false;
}
_rx += readAmount;
return _pProtocol->SignalInputData(readAmount);
}
case SET_WRITE:
{
IOBuffer *pOutputBuffer = NULL;

while ((pOutputBuffer = _pProtocol->GetOutputBuffer()) != NULL) {
if (!pOutputBuffer->WriteToTCPFd(_outboundFd,
_sendBufferSize, writeAmount)) {
FATAL("Unable to send data. %s:%hu -> %s:%hu",
STR(_farIp), _farPort,
STR(_nearIp), _nearPort);
IOHandlerManager::EnqueueForDelete(this);
return false;
}
_tx += writeAmount;
if (GETAVAILABLEBYTESCOUNT(*pOutputBuffer) > 0) {
ENABLE_WRITE_DATA;
break;
}
}
if (pOutputBuffer == NULL) {
DISABLE_WRITE_DATA;
}
return true;
}
default:
{
ASSERT("Invalid state: %hhu", event.type);
return false;
}
}
}

bool TCPProtocol::SignalInputData(int32_t recvAmount) {
_decodedBytesCount += recvAmount;
return _pNearProtocol->SignalInputData(_inputBuffer);
}

bool BaseRTMPProtocol::SignalInputData(IOBuffer &buffer) {
if (_enqueueForDelete)
return true;

bool result = false;
if (_handshakeCompleted) {
result = ProcessBytes(buffer);
uint64_t decodedBytes = GetDecodedBytesCount();
if (result && (decodedBytes >= _nextReceivedBytesCountReport)) {
Variant _bytesReadMessage = GenericMessageFactory::GetAck(decodedBytes);
_nextReceivedBytesCountReport += _winAckSize;
if (!SendMessage(_bytesReadMessage)) {
FATAL("Unable to send\n%s", STR(_bytesReadMessage.ToString()));
return false;
}
}
} else {
result = PerformHandshake(buffer);
if (!result) {
FATAL("Unable to perform handshake");
return false;
}
if (_handshakeCompleted) {
result = SignalInputData(buffer);
if (result && (GetType() == PT_OUTBOUND_RTMP)) {
result = _pProtocolHandler->OutboundConnectionEstablished(
(OutboundRTMPProtocol *) this);
}
}
}
return result;
}


从日志上看,crtmpserver是对每一种协议起不同的服务。也就是访问单个端口的应用对应一定的协议,这样处理比较简单。

这个对应关系是配置文件里配置好的,比如 tcp-8080-binboundRtmpt
整体上看共分三层,协议层只提供不同的协议封装,配置层控制启动哪些服务。层次清晰。

|---------------------|

|         配置           |

|---------------------|

|          IO              |

|---------------------|

|         proto          |

|---------------------|
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: