crtmpserver基本架构(协议和服务对应关系)
2015-09-28 11:11
615 查看
<pre name="code" class="html">string ConfigFile::GetServicesInfo() {
map<uint32_t, BaseClientApplication *> applications = ClientApplicationManager::GetAllApplications();
stringstream ss;
ss << "+-----------------------------------------------------------------------------+" << endl;
ss << "|";
ss.width(77);
ss << "Services";
ss << "|" << endl;
ss << "+---+---------------+-----+-------------------------+-------------------------+" << endl;
ss << "| c | ip | port| protocol stack name | application name |" << endl;
FOR_MAP(applications, uint32_t, BaseClientApplication *, i) {
ss << MAP_VAL(i)->GetServicesInfo();
}
ss << "+---+---------------+-----+-------------------------+-------------------------+";
return ss.str();
}
string BaseClientApplication::GetServicesInfo() {map<uint32_t, IOHandler *> handlers = IOHandlerManager::GetActiveHandlers();string result = "";FOR_MAP(handlers, uint32_t, IOHandler *, i) {result += GetServiceInfo(MAP_VAL(i));}return result;}
下面是一段日志打印
+-----------------------------------------------------------------------------+
| Services|
+---+---------------+-----+-------------------------+-------------------------+
| c | ip | port| protocol stack name | application name |
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 1112| inboundJsonCli| admin|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 1935| inboundRtmp| appselector|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 8081| inboundRtmps| appselector|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 8080| inboundRtmpt| appselector|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 6666| inboundLiveFlv| flvplayback|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 9999| inboundTcpTs| flvplayback|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 5544| inboundRtsp| flvplayback|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 6665| inboundLiveFlv| proxypublish|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 8989| httpEchoProtocol| samplefactory|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 8988| echoProtocol| samplefactory|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 1111| inboundHttpXmlVariant| vptests|
+---+---------------+-----+-------------------------+-------------------------
//4. Create the chain
BaseProtocol *pProtocol = ProtocolFactoryManager::CreateProtocolChain(
_protocolChain, _parameters);
if (pProtocol == NULL) {
FATAL("Unable to create protocol chain");
CLOSE_SOCKET(fd);
return false;
}
从日志上看,crtmpserver是对每一种协议起不同的服务。也就是访问单个端口的应用对应一定的协议,这样处理比较简单。
这个对应关系是配置文件里配置好的,比如 tcp-8080-binboundRtmpt
整体上看共分三层,协议层只提供不同的协议封装,配置层控制启动哪些服务。层次清晰。
|---------------------|
| 配置 |
|---------------------|
| IO |
|---------------------|
| proto |
|---------------------|
map<uint32_t, BaseClientApplication *> applications = ClientApplicationManager::GetAllApplications();
stringstream ss;
ss << "+-----------------------------------------------------------------------------+" << endl;
ss << "|";
ss.width(77);
ss << "Services";
ss << "|" << endl;
ss << "+---+---------------+-----+-------------------------+-------------------------+" << endl;
ss << "| c | ip | port| protocol stack name | application name |" << endl;
FOR_MAP(applications, uint32_t, BaseClientApplication *, i) {
ss << MAP_VAL(i)->GetServicesInfo();
}
ss << "+---+---------------+-----+-------------------------+-------------------------+";
return ss.str();
}
string BaseClientApplication::GetServicesInfo() {map<uint32_t, IOHandler *> handlers = IOHandlerManager::GetActiveHandlers();string result = "";FOR_MAP(handlers, uint32_t, IOHandler *, i) {result += GetServiceInfo(MAP_VAL(i));}return result;}
string BaseClientApplication::GetServiceInfo(IOHandler *pIOHandler) { if ((pIOHandler->GetType() != IOHT_ACCEPTOR) && (pIOHandler->GetType() != IOHT_UDP_CARRIER)) return ""; if (pIOHandler->GetType() == IOHT_ACCEPTOR) { if ((((TCPAcceptor *) pIOHandler)->GetApplication() == NULL) || (((TCPAcceptor *) pIOHandler)->GetApplication()->GetId() != GetId())) { return ""; } } else { if ((((UDPCarrier *) pIOHandler)->GetProtocol() == NULL) || (((UDPCarrier *) pIOHandler)->GetProtocol()->GetNearEndpoint()->GetApplication() == NULL) || (((UDPCarrier *) pIOHandler)->GetProtocol()->GetNearEndpoint()->GetApplication()->GetId() != GetId())) { return ""; } } Variant ¶ms = pIOHandler->GetType() == IOHT_ACCEPTOR ? ((TCPAcceptor *) pIOHandler)->GetParameters() : ((UDPCarrier *) pIOHandler)->GetParameters(); if (params != V_MAP) return ""; stringstream ss; ss << "+---+---------------+-----+-------------------------+-------------------------+" << endl; ss << "|"; ss.width(3); ss << (pIOHandler->GetType() == IOHT_ACCEPTOR ? "tcp" : "udp"); ss << "|"; ss.width(3 * 4 + 3); ss << (string) params[CONF_IP]; ss << "|"; ss.width(5); ss << (uint16_t) params[CONF_PORT]; ss << "|"; ss.width(25); ss << (string) params[CONF_PROTOCOL]; ss << "|"; ss.width(25); ss << GetName(); ss << "|"; ss << endl; return ss.str(); }
下面是一段日志打印
+-----------------------------------------------------------------------------+
| Services|
+---+---------------+-----+-------------------------+-------------------------+
| c | ip | port| protocol stack name | application name |
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 1112| inboundJsonCli| admin|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 1935| inboundRtmp| appselector|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 8081| inboundRtmps| appselector|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 8080| inboundRtmpt| appselector|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 6666| inboundLiveFlv| flvplayback|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 9999| inboundTcpTs| flvplayback|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 5544| inboundRtsp| flvplayback|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 6665| inboundLiveFlv| proxypublish|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 8989| httpEchoProtocol| samplefactory|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 8988| echoProtocol| samplefactory|
+---+---------------+-----+-------------------------+-------------------------+
|tcp| 0.0.0.0| 1111| inboundHttpXmlVariant| vptests|
+---+---------------+-----+-------------------------+-------------------------
//4. Create the chain
BaseProtocol *pProtocol = ProtocolFactoryManager::CreateProtocolChain(
_protocolChain, _parameters);
if (pProtocol == NULL) {
FATAL("Unable to create protocol chain");
CLOSE_SOCKET(fd);
return false;
}
bool TCPCarrier::OnEvent(select_event &event) { int32_t readAmount = 0; int32_t writeAmount = 0; //3. Do the I/O switch (event.type) { case SET_READ: { IOBuffer *pInputBuffer = _pProtocol->GetInputBuffer(); assert(pInputBuffer != NULL); if (!pInputBuffer->ReadFromTCPFd(_inboundFd, _recvBufferSize, readAmount)) { FATAL("Unable to read data. %s:%hu -> %s:%hu", STR(_farIp), _farPort, STR(_nearIp), _nearPort); return false; } _rx += readAmount; return _pProtocol->SignalInputData(readAmount); } case SET_WRITE: { IOBuffer *pOutputBuffer = NULL; while ((pOutputBuffer = _pProtocol->GetOutputBuffer()) != NULL) { if (!pOutputBuffer->WriteToTCPFd(_outboundFd, _sendBufferSize, writeAmount)) { FATAL("Unable to send data. %s:%hu -> %s:%hu", STR(_farIp), _farPort, STR(_nearIp), _nearPort); IOHandlerManager::EnqueueForDelete(this); return false; } _tx += writeAmount; if (GETAVAILABLEBYTESCOUNT(*pOutputBuffer) > 0) { ENABLE_WRITE_DATA; break; } } if (pOutputBuffer == NULL) { DISABLE_WRITE_DATA; } return true; } default: { ASSERT("Invalid state: %hhu", event.type); return false; } } }
bool TCPProtocol::SignalInputData(int32_t recvAmount) { _decodedBytesCount += recvAmount; return _pNearProtocol->SignalInputData(_inputBuffer); }
bool BaseRTMPProtocol::SignalInputData(IOBuffer &buffer) { if (_enqueueForDelete) return true; bool result = false; if (_handshakeCompleted) { result = ProcessBytes(buffer); uint64_t decodedBytes = GetDecodedBytesCount(); if (result && (decodedBytes >= _nextReceivedBytesCountReport)) { Variant _bytesReadMessage = GenericMessageFactory::GetAck(decodedBytes); _nextReceivedBytesCountReport += _winAckSize; if (!SendMessage(_bytesReadMessage)) { FATAL("Unable to send\n%s", STR(_bytesReadMessage.ToString())); return false; } } } else { result = PerformHandshake(buffer); if (!result) { FATAL("Unable to perform handshake"); return false; } if (_handshakeCompleted) { result = SignalInputData(buffer); if (result && (GetType() == PT_OUTBOUND_RTMP)) { result = _pProtocolHandler->OutboundConnectionEstablished( (OutboundRTMPProtocol *) this); } } } return result; }
从日志上看,crtmpserver是对每一种协议起不同的服务。也就是访问单个端口的应用对应一定的协议,这样处理比较简单。
这个对应关系是配置文件里配置好的,比如 tcp-8080-binboundRtmpt
整体上看共分三层,协议层只提供不同的协议封装,配置层控制启动哪些服务。层次清晰。
|---------------------|
| 配置 |
|---------------------|
| IO |
|---------------------|
| proto |
|---------------------|
相关文章推荐
- Web API应用架构在Winform混合框架中的应用(2)--自定义异常结果的处理
- 分享:如何稳步提升网站的排名
- 架构师和数学
- 一步步构建大型网站架构
- JS实现淘宝支付宝网站的控制台菜单效果
- php 大流量网站访问
- 部署mysql高可用、读写分离集群 推荐
- 交换芯片架构 (一)
- JS+CSS实现六级网站导航主菜单效果
- IE11兼容性问题——网站提示“打印插件只支持IE浏览器”
- Lvs+Keepalived+MySQL Cluster架设高可用负载均衡Mysql集群
- 一个不错的拳皇技巧学习网站
- 学Android开发的人可以去的几个网站
- sap apo 软件的架构和设计分析
- 学Android开发的人可以去的几个网站
- Django后台:少量代码,实现强大的网站后台
- JS+CSS实现六级网站导航主菜单效果
- JS实现淘宝支付宝网站的控制台菜单效果
- 网站关键词优化--新手站6长怎么保持关键词排名的稳定
- 用Wget下载整个网站