您的位置:首页 > 编程语言 > VB

Live555转发DVB实时流数据

2015-06-09 15:32 561 查看
    最近在做一个关于流媒体的项目,其需求简单描述就是把DVB上播放的电视内容,通过流媒体服务器发送到局域网上,使用户可以在手机或者PC端观看电视节目。

    虽然从网上知道Live555并不是最好的流媒体解决方案,但是想快速出Demo的话,Live555还算是一个不错的选择。之前介绍了Live555的移植,这里就不再赘述。

    闲话没有,直接捞干的。

    一、Live555需要修改的内容

    Live555需要修改的内容主要是两个方面:

    1)Source

    2)MediaSubsession

    因为从DVB上读取到的是TS格式的数据流,所以我们参考TS文件的代码。参考的两个类及其在Live555中的继承关系分别是:

    1)ByteStreamFileSource



    2)MPEG2TransportFileServerMediaSubsession



二、新创建的类及继承关系

1)ByteStreamLiveSource



2)MPEG2TransportLiveServerMediaSubsession



三、这两个类的实现

1)定义一个数据类型,用于存放从DVB中读取到的数据

    文件名称:LiveTSType.hh

    说明:因为我们读取的实时TS流数据,所以需要定时的为数据增加PAT表和PMT表,以使客户端随时能够获取TS流中的音视频格式信息。

#ifndef __LIVE_TS_BUFFER_TYPE_HH
#define __LIVE_TS_BUFFER_TYPE_HH

#ifdef __cplusplus
extern "C"{
#endif

#define TRANSPORT_SYNC_BYTE 0x47        // TS流同步字节
#define REC_BUF_MAX_LEN 256*1024+2*188  // 每个buffer的有效数据长度为256K字节,多出的2*188字节存放PAT表和PMT表

typedef struct s_buffer
{
unsigned char   buf_writing;        // buffer是否处于被写状态,buffer在写入状态不稳定,需要等待写完成才能读取
unsigned int    buf_len;            // 写入数据的实际长度
unsigned int    buf_read_counter;   // 记录已经读取的字节数
unsigned char   buf_data[REC_BUF_MAX_LEN];  // 存放数据的buffer
} t_TS_Buffer;

#ifdef __cplusplus
}
#endif

#endif


2)ByteStreamLiveSource类

    文件名称:ByteStreamLiveSource.hh

#ifndef _BYTE_STREAM_LIVE_SOURCE_HH
#define _BYTE_STREAM_LIVE_SOURCE_HH

#ifndef _FRAMED_SOURCE_HH
#include "FramedSource.hh"
#endif

#ifndef __LIVE_TS_BUFFER_TYPE_HH
#include "LiveTSType.hh"
#endif

class ByteStreamLiveSource: public FramedSource
{
public:
static ByteStreamLiveSource* createNew(UsageEnvironment& env,
t_TS_Buffer *p_tsbuf,
unsigned preferredFrameSize = 0,
unsigned playTimePerFrame = 0);

//void seekToByteAbsolute(u_int64_t byteNumber, u_int64_t numBytesToStream = 0);
// if "numBytesToStream" is >0, then we limit the stream to that number of bytes, before treating it as EOF
//void seekToByteRelative(int64_t offset, u_int64_t numBytesToStream = 0);
//void seekToEnd(); // to force EOF handling on the next read

protected:
ByteStreamLiveSource(UsageEnvironment& env,
t_TS_Buffer *p_tsbuf,
unsigned preferredFrameSize,
unsigned playTimePerFrame);
// called only by createNew()

virtual ~ByteStreamLiveSource();

//static void fileReadableHandler(ByteStreamFileSource* source, int mask);
void doReadFromBuffer();

private:
// redefined virtual functions:
virtual void doGetNextFrame();
virtual void doStopGettingFrames();

private:
unsigned fPreferredFrameSize;
unsigned fPlayTimePerFrame;
//Boolean fFidIsSeekable;
unsigned fLastPlayTime;
Boolean fHaveStartedReading;
Boolean fLimitNumBytesToStream;
u_int64_t fNumBytesToStream; // used iff "fLimitNumBytesToStream" is True

// Added by zhaoyulei, for live ts buffer
t_TS_Buffer *fPTsBuf;       // TS流buffer数据指针
t_TS_Buffer fLocalBuf;      // 本地buffer
};

#endif


    文件名称:ByteStreamLiveSource.cpp
    说明:需要注意的是异常处理,比如读到的数据第一个字节不是0x47的情形。代码中的注释比较详细,不再细说。

#include "ByteStreamLiveSource.hh"
#include "GroupsockHelper.hh"

////////// ByteStreamLiveSource //////////

#include "LiveTSType.hh"

ByteStreamLiveSource*
ByteStreamLiveSource::createNew(UsageEnvironment& env,
t_TS_Buffer *p_tsbuf,
unsigned preferredFrameSize,
unsigned playTimePerFrame)
{
printf("=== zyl ===, %s, %d\n",__FILE__, __LINE__);

if (p_tsbuf == NULL)
return NULL;

ByteStreamLiveSource* newSource
= new ByteStreamLiveSource(env, p_tsbuf, preferredFrameSize, playTimePerFrame);
//newSource->fPTsBuf = p_tsbuf;
// 初始化
newSource->fLocalBuf.buf_writing = 0;
newSource->fLocalBuf.buf_len = 0;
newSource->fLocalBuf.buf_read_counter = 0;
memset(newSource->fLocalBuf.buf_data, 0, REC_BUF_MAX_LEN);

return newSource;
}

ByteStreamLiveSource::ByteStreamLiveSource(UsageEnvironment& env,
t_TS_Buffer *p_tsbuf,
unsigned preferredFrameSize,
unsigned playTimePerFrame)
: FramedSource(env), fPTsBuf(p_tsbuf),fPreferredFrameSize(preferredFrameSize),
fPlayTimePerFrame(playTimePerFrame), fLastPlayTime(0),
fHaveStartedReading(False), fLimitNumBytesToStream(False), fNumBytesToStream(0)
{

}

ByteStreamLiveSource::~ByteStreamLiveSource()
{

}

void ByteStreamLiveSource::doGetNextFrame()
{
if (fLimitNumBytesToStream && fNumBytesToStream == 0)
{
handleClosure();
return;
}

doReadFromBuffer();
}

void ByteStreamLiveSource::doStopGettingFrames()
{
envir().taskScheduler().unscheduleDelayedTask(nextTask());
}

void ByteStreamLiveSource::doReadFromBuffer()
{
//printf("=== zyl ===, %s, %d\n",__FILE__, __LINE__);
//初始化计数器
unsigned int readLen = 0;
unsigned int syncBytePosition = 0;

// 没用,之前确定最多读取字节数的变量
fMaxSize = fPreferredFrameSize;

//printf("=== zyl ===, fLocalBuf.buf_read_counter = %d, fLocalBuf.buf_len = %d\n",
//            fLocalBuf.buf_read_counter, fLocalBuf.buf_len);
//初始化Frame Size
fFrameSize = 0;

//如果剩余的字节数不够,先读取剩余的字节数
if((fLocalBuf.buf_len - fLocalBuf.buf_read_counter) < fPreferredFrameSize)
{
// fMaxSize = fLocalBuf.buf_len - fLocalBuf.buf_read_counter;
// 确定要读取的字节数
readLen = fLocalBuf.buf_len - fLocalBuf.buf_read_counter;

// 读取这些字节
memcpy(fTo, (fLocalBuf.buf_data + fLocalBuf.buf_read_counter), readLen);
//fMaxSize += fLocalBuf.buf_len - fLocalBuf.buf_read_counter;

// 已经读取字节数统计值
fLocalBuf.buf_read_counter += readLen;

// 当前Frame Size
fFrameSize += readLen;
}

4000

// 如果已经读完一个buffer
if(fLocalBuf.buf_read_counter == fLocalBuf.buf_len)
{
while(fPTsBuf->buf_writing !=0 )
{
printf("=== zyl === waiting for buf_writing\n");
};
#if 0
for(i = 0; i < 188; i++)
{
printf("%02x, ", fPTsBuf->buf_data[i]);
if ((i+1)%16 == 0)
printf("\n");
}
printf("\n");
#endif
memcpy(fLocalBuf.buf_data, fPTsBuf->buf_data, fPTsBuf->buf_len);
fLocalBuf.buf_read_counter = 0;
fLocalBuf.buf_len = fPTsBuf->buf_len;
}

// 如果已经读取的字节数不够
if(fFrameSize < fPreferredFrameSize)
{
// 还需要读取这些字节的数据
readLen = fPreferredFrameSize - fFrameSize;

// 读取这些字节,当然,起始地址需要改变一下
memcpy(fTo+fFrameSize, (fLocalBuf.buf_data + fLocalBuf.buf_read_counter), readLen);

// 已经读取字节数统计值
fLocalBuf.buf_read_counter += readLen;

// 当前Frame Size
fFrameSize += readLen;
}

// 如果读到的buffer第一个字节不是0x47

while(TRANSPORT_SYNC_BYTE != fTo[syncBytePosition])
{
syncBytePosition++;
}
if(0 != syncBytePosition)
{
printf("=== zyl === syncBytePosition !=0\n");
memmove(fTo, &fTo[syncBytePosition], fFrameSize - syncBytePosition);
fFrameSize -= syncBytePosition;

// 如果已经读取的字节数不够
if(fFrameSize < fPreferredFrameSize)
{
// 还需要读取这些字节的数据
readLen = fPreferredFrameSize - fFrameSize;

// 读取这些字节,当然,起始地址需要改变一下
memcpy(fTo+fFrameSize, (fLocalBuf.buf_data + fLocalBuf.buf_read_counter), readLen);

// 已经读取字节数统计值
fLocalBuf.buf_read_counter += readLen;

// 当前Frame Size
fFrameSize += readLen;
}
}

//printf("=== zyl === ,fLocalBuf.buf_read_counter = %d, fLocalBuf.buf_len = %d\n",
//            fLocalBuf.buf_read_counter, fLocalBuf.buf_len);

if (fFrameSize == 0)
{
handleClosure();
return;
}
//fNumBytesToStream -= fFrameSize;

// Set the 'presentation time':
if (fPlayTimePerFrame > 0 && fPreferredFrameSize > 0)
{
if (fPresentationTime.tv_sec == 0 && fPresentationTime.tv_usec == 0)
{
// This is the first frame, so use the current time:
gettimeofday(&fPresentationTime, NULL);
}
else
{
// Increment by the play time of the previous data:
unsigned uSeconds	= fPresentationTime.tv_usec + fLastPlayTime;
fPresentationTime.tv_sec += uSeconds/1000000;
fPresentationTime.tv_usec = uSeconds%1000000;
}

// Remember the play time of this data:
fLastPlayTime = (fPlayTimePerFrame*fFrameSize)/fPreferredFrameSize;
fDurationInMicroseconds = fLastPlayTime;
}
else
{
// We don't know a specific play time duration for this data,
// so just record the current time as being the 'presentation time':
gettimeofday(&fPresentationTime, NULL);
}

// Inform the reader that he has data:
// Because the file read was done from the event loop, we can call the
// 'after getting' function directly, without risk of infinite recursion:
FramedSource::afterGetting(this);
}


3)MPEG2TransportLiveServerMediaSubsession类

    文件名称:MPEG2TransportLiveServerMediaSubsession.hh

    说明:参照MPEG2TransportFileServerMediaSubsession类,把不需要的代码给删除掉

#ifndef _MPEG2_TRANSPORT_LIVE_SERVER_MEDIA_SUBSESSION_HH
#define _MPEG2_TRANSPORT_LIVE_SERVER_MEDIA_SUBSESSION_HH

#ifndef _ON_DEMAND_SERVER_MEDIA_SUBSESSION_HH
#include "OnDemandServerMediaSubsession.hh"
#endif
#ifndef _MPEG2_TRANSPORT_STREAM_FRAMER_HH
#include "MPEG2TransportStreamFramer.hh"
#endif
#ifndef _BYTE_STREAM_LIVE_SOURCE_HH
#include "ByteStreamLiveSource.hh"
#endif
#ifndef _MPEG2_TRANSPORT_STREAM_TRICK_MODE_FILTER_HH
#include "MPEG2TransportStreamTrickModeFilter.hh"
#endif
#ifndef _MPEG2_TRANSPORT_STREAM_FROM_ES_SOURCE_HH
#include "MPEG2TransportStreamFromESSource.hh"
#endif

#ifndef __LIVE_TS_BUFFER_TYPE_HH
#include "LiveTSType.hh"
#endif

//class ClientTrickPlayState; // forward

class MPEG2TransportLiveServerMediaSubsession: public OnDemandServerMediaSubsession
{
public:
static MPEG2TransportLiveServerMediaSubsession* createNew(UsageEnvironment& env,
Boolean reuseFirstSource,
t_TS_Buffer *p_tsbuf);

protected:
MPEG2TransportLiveServerMediaSubsession(UsageEnvironment& env,
Boolean reuseFirstSource,
t_TS_Buffer *p_tsbuf);
// called only by createNew();
virtual ~MPEG2TransportLiveServerMediaSubsession();

//virtual ClientTrickPlayState* newClientTrickPlayState();

private: // redefined virtual functions
// Note that because - to implement 'trick play' operations - we're operating on
// more than just the input source, we reimplement some functions that are
// already implemented in "OnDemandServerMediaSubsession", rather than
// reimplementing "seekStreamSource()" and "setStreamSourceScale()":
virtual void startStream(unsigned clientSessionId, void* streamToken,
TaskFunc* rtcpRRHandler,
void* rtcpRRHandlerClientData,
unsigned short& rtpSeqNum,
unsigned& rtpTimestamp,
ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
void* serverRequestAlternativeByteHandlerClientData);
virtual void pauseStream(unsigned clientSessionId, void* streamToken);
virtual void seekStream(unsigned clientSessionId, void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes);
virtual void setStreamScale(unsigned clientSessionId, void* streamToken, float scale);
virtual void deleteStream(unsigned clientSessionId, void*& streamToken);

// The virtual functions thare are usually implemented by "ServerMediaSubsession"s:
virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
unsigned& estBitrate);
virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
unsigned char rtpPayloadTypeIfDynamic,
FramedSource* inputSource);

virtual void testScaleFactor(float& scale);
virtual float duration() const;

private:
//ClientTrickPlayState* lookupClient(unsigned clientSessionId);

private:
//    MPEG2TransportStreamIndexFile* fIndexFile;
float fDuration;
HashTable* fClientSessionHashTable; // indexed by client session id

// The live ts buffer, added by zhaoyulei, 2015.06.01
t_TS_Buffer *fPTsBuf;  // buffer指针

};


    文件名称:MPEG2TransportLiveServerMediaSubsession.cpp

    说明:参照MPEG2TransportFileServerMediaSubsession类,把不需要的代码给删除掉

#include "MPEG2TransportLiveServerMediaSubsession.hh"
#include "SimpleRTPSink.hh"
#include "LiveTSType.hh"
#include <zyl_debug.h>

MPEG2TransportLiveServerMediaSubsession*
MPEG2TransportLiveServerMediaSubsession::createNew(
UsageEnvironment&   env,
Boolean             reuseFirstSource,
t_TS_Buffer*        p_tsbuf)
{
// printf("=== zyl ===, %s, %d\n",__FILE__, __LINE__);

MPEG2TransportLiveServerMediaSubsession* newpSubsession =
new MPEG2TransportLiveServerMediaSubsession(env,
reuseFirstSource, p_tsbuf);
//newpSubsession->fPTsBuf = p_tsbuf;
return newpSubsession;
}

MPEG2TransportLiveServerMediaSubsession
::MPEG2TransportLiveServerMediaSubsession(
UsageEnvironment&               env,
Boolean                         reuseFirstSource,
t_TS_Buffer*                    p_tsbuf)
: OnDemandServerMediaSubsession(env, reuseFirstSource),fPTsBuf(p_tsbuf),
fDuration(0.0), fClientSessionHashTable(NULL)
{

}

MPEG2TransportLiveServerMediaSubsession
::~MPEG2TransportLiveServerMediaSubsession()
{
}

#define TRANSPORT_PACKET_SIZE 188
#define TRANSPORT_PACKETS_PER_NETWORK_PACKET 7
// The product of these two numbers must be enough to fit within a network packet

void MPEG2TransportLiveServerMediaSubsession
::startStream(unsigned clientSessionId, void* streamToken, TaskFunc* rtcpRRHandler,
void* rtcpRRHandlerClientData, unsigned short& rtpSeqNum,
unsigned& rtpTimestamp,
ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
void* serverRequestAlternativeByteHandlerClientData)
{
// Call the original, default version of this routine:
OnDemandServerMediaSubsession::startStream(clientSessionId, streamToken,
rtcpRRHandler, rtcpRRHandlerClientData,
rtpSeqNum, rtpTimestamp,
serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
}

void MPEG2TransportLiveServerMediaSubsession
::pauseStream(unsigned clientSessionId, void* streamToken)
{
// Call the original, default version of this routine:
OnDemandServerMediaSubsession::pauseStream(clientSessionId, streamToken);
}

void MPEG2TransportLiveServerMediaSubsession
::seekStream(unsigned clientSessionId, void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes)
{
// Begin by calling the original, default version of this routine:
OnDemandServerMediaSubsession::seekStream(clientSessionId, streamToken, seekNPT, streamDuration, numBytes);

}

void MPEG2TransportLiveServerMediaSubsession
::setStreamScale(unsigned clientSessionId, void* streamToken, float scale)
{

// Call the original, default version of this routine:
OnDemandServerMediaSubsession::setStreamScale(clientSessionId, streamToken, scale);
}

void MPEG2TransportLiveServerMediaSubsession
::deleteStream(unsigned clientSessionId, void*& streamToken)
{

// Call the original, default version of this routine:
OnDemandServerMediaSubsession::deleteStream(clientSessionId, streamToken);
}

FramedSource* MPEG2TransportLiveServerMediaSubsession
::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate)
{
// Create the video source:
unsigned const inputDataChunkSize
= TRANSPORT_PACKETS_PER_NETWORK_PACKET*TRANSPORT_PACKET_SIZE;

// 使用自己定义的source
ByteStreamLiveSource* liveSource
= ByteStreamLiveSource::createNew(envir(), fPTsBuf,inputDataChunkSize);

estBitrate = 5000; // kbps, estimate

// Create a framer for the Transport Stream:
MPEG2TransportStreamFramer* framer
= MPEG2TransportStreamFramer::createNew(envir(), liveSource);

return framer;
}

RTPSink* MPEG2TransportLiveServerMediaSubsession
::createNewRTPSink(Groupsock* rtpGroupsock,
unsigned char /*rtpPayloadTypeIfDynamic*/,
FramedSource* /*inputSource*/)
{
return SimpleRTPSink::createNew(envir(), rtpGroupsock,
33, 90000, "video", "MP2T",
1, True, False /*no 'M' bit*/);
}

void MPEG2TransportLiveServerMediaSubsession::testScaleFactor(float& scale)
{
scale = 1.0f;
}

float MPEG2TransportLiveServerMediaSubsession::duration() const
{
return fDuration;
}


    这样我们需要的两个类就创建好了,最主要的工作算完成了
四、修改DynamicRTSPServer类,增加我们的媒体类型

1)修改DynamicRTSPServer.hh,重定义一个createNew成员函数,增加一个私有的t_TS_Buffer * 型成员

#ifndef _DYNAMIC_RTSP_SERVER_HH
#define _DYNAMIC_RTSP_SERVER_HH

#ifndef _RTSP_SERVER_SUPPORTING_HTTP_STREAMING_HH
#include "RTSPServerSupportingHTTPStreaming.hh"
#endif

#ifndef __LIVE_TS_BUFFER_TYPE_HH
#include "LiveTSType.hh"
#endif

class DynamicRTSPServer: public RTSPServerSupportingHTTPStreaming {
public:
static DynamicRTSPServer* createNew(UsageEnvironment& env, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationTestSeconds = 65);

// 新的createNew,可以传递t_TS_Buffer指针
static DynamicRTSPServer* createNew(UsageEnvironment& env, Port ourPort,
UserAuthenticationDatabase* authDatabase,
t_TS_Buffer *p_tsbuf,
unsigned reclamationTestSeconds = 65);

protected:
DynamicRTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,
UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds);
// called only by createNew();
virtual ~DynamicRTSPServer();

protected: // redefined virtual functions
virtual ServerMediaSession*
lookupServerMediaSession(char const* streamName, Boolean isFirstLookupInSession);

private:
// The live ts buffer, added by zhaoyulei, 2015.06.01
// t_TS_Buffer指针
t_TS_Buffer *fPTsBuf;
};

#endif


3、修改DynamicRTSPServer.cpp,

#include "DynamicRTSPServer.hh"
#include <liveMedia.hh>
#include <string.h>

DynamicRTSPServer*
DynamicRTSPServer::createNew(UsageEnvironment& env, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationTestSeconds) {
int ourSocket = setUpOurSocket(env, ourPort);
if (ourSocket == -1) return NULL;

return new DynamicRTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds);
}

// 新的createNew函数
DynamicRTSPServer*
DynamicRTSPServer::createNew(UsageEnvironment& env, Port ourPort,
UserAuthenticationDatabase* authDatabase,
t_TS_Buffer *p_tsbuf,
unsigned reclamationTestSeconds)
{
int ourSocket = setUpOurSocket(env, ourPort);
if (ourSocket == -1) return NULL;

DynamicRTSPServer* newDynamicRTSPServer =
new DynamicRTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds);
newDynamicRTSPServer->fPTsBuf = p_tsbuf;
return newDynamicRTSPServer;
}

DynamicRTSPServer::DynamicRTSPServer(UsageEnvironment& env, int ourSocket,
Port ourPort,
UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds)
: RTSPServerSupportingHTTPStreaming(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds),
fPTsBuf(NULL)
{
}

DynamicRTSPServer::~DynamicRTSPServer() {
}

static ServerMediaSession* createNewSMS(UsageEnvironment& env,
char const* fileName, FILE* fid, t_TS_Buffer *p_tsbuf); // forward

ServerMediaSession* DynamicRTSPServer
::lookupServerMediaSession(char const* streamName, Boolean isFirstLookupInSession)
{
// First, check whether the specified "streamName" exists as a local file:

FILE* fid = fopen(streamName, "rb");
Boolean fileExists = fid != NULL;
// 处理新的数据类型
if (strcmp(streamName, "live") == 0)
{
fileExists = 1;
}

// Next, check whether we already have a "ServerMediaSession" for this file:
ServerMediaSession* sms = RTSPServer::lookupServerMediaSession(streamName);
Boolean smsExists = sms != NULL;

// Handle the four possibilities for "fileExists" and "smsExists":
if (!fileExists) {
if (smsExists) {
// "sms" was created for a file that no longer exists. Remove it:
removeServerMediaSession(sms);
sms = NULL;
}

return NULL;
} else {
if (smsExists && isFirstLookupInSession) {
// Remove the existing "ServerMediaSession" and create a new one, in case the underlying
// file has changed in some way:
removeServerMediaSession(sms);
sms = NULL;
}
// 处理新的数据类型
if (sms == NULL)
{
sms = createNewSMS(envir(), streamName, fid, fPTsBuf);
addServerMediaSession(sms);
}
if(fileExists && (strcmp(streamName, "live") != 0))
{
fclose(fid);
}
return sms;
}
}

// Special code
bf9b
for handling Matroska files:
struct MatroskaDemuxCreationState {
MatroskaFileServerDemux* demux;
char watchVariable;
};
static void onMatroskaDemuxCreation(MatroskaFileServerDemux* newDemux, void* clientData) {
MatroskaDemuxCreationState* creationState = (MatroskaDemuxCreationState*)clientData;
creationState->demux = newDemux;
creationState->watchVariable = 1;
}
// END Special code for handling Matroska files:

// Special code for handling Ogg files:
struct OggDemuxCreationState {
OggFileServerDemux* demux;
char watchVariable;
};
static void onOggDemuxCreation(OggFileServerDemux* newDemux, void* clientData) {
OggDemuxCreationState* creationState = (OggDemuxCreationState*)clientData;
creationState->demux = newDemux;
creationState->watchVariable = 1;
}
// END Special code for handling Ogg files:

#define NEW_SMS(description) do {\
char const* descStr = description\
", streamed by the LIVE555 Media Server";\
sms = ServerMediaSession::createNew(env, fileName, fileName, descStr);\
} while(0)

static ServerMediaSession* createNewSMS(UsageEnvironment& env,
char const* fileName, FILE* /*fid*/, t_TS_Buffer *p_tsbuf) {
// Use the file name extension to determine the type of "ServerMediaSession":
char const* extension = strrchr(fileName, '.');
if (strcmp(fileName, "live") == 0)
{
extension = "live";
}

if (extension == NULL) return NULL;

ServerMediaSession* sms = NULL;
Boolean const reuseSource = False;
if (strcmp(extension, ".aac") == 0) {
// Assumed to be an AAC Audio (ADTS format) file:
NEW_SMS("AAC Audio");
sms->addSubsession(ADTSAudioFileServerMediaSubsession::createNew(env, fileName, reuseSource));
} else if (strcmp(extension, ".amr") == 0) {
// Assumed to be an AMR Audio file:
NEW_SMS("AMR Audio");
sms->addSubsession(AMRAudioFileServerMediaSubsession::createNew(env, fileName, reuseSource));
} else if (strcmp(extension, ".ac3") == 0) {
// Assumed to be an AC-3 Audio file:
NEW_SMS("AC-3 Audio");
sms->addSubsession(AC3AudioFileServerMediaSubsession::createNew(env, fileName, reuseSource));
} else if (strcmp(extension, ".m4e") == 0) {
// Assumed to be a MPEG-4 Video Elementary Stream file:
NEW_SMS("MPEG-4 Video");
sms->addSubsession(MPEG4VideoFileServerMediaSubsession::createNew(env, fileName, reuseSource));
} else if (strcmp(extension, ".264") == 0) {
// Assumed to be a H.264 Video Elementary Stream file:
NEW_SMS("H.264 Video");
OutPacketBuffer::maxSize = 100000; // allow for some possibly large H.264 frames
sms->addSubsession(H264VideoFileServerMediaSubsession::createNew(env, fileName, reuseSource));
} else if (strcmp(extension, ".265") == 0) {
// Assumed to be a H.265 Video Elementary Stream file:
NEW_SMS("H.265 Video");
OutPacketBuffer::maxSize = 100000; // allow for some possibly large H.265 frames
sms->addSubsession(H265VideoFileServerMediaSubsession::createNew(env, fileName, reuseSource));
} else if (strcmp(extension, ".mp3") == 0) {
// Assumed to be a MPEG-1 or 2 Audio file:
NEW_SMS("MPEG-1 or 2 Audio");
// To stream using 'ADUs' rather than raw MP3 frames, uncomment the following:
//#define STREAM_USING_ADUS 1
// To also reorder ADUs before streaming, uncomment the following:
//#define INTERLEAVE_ADUS 1
// (For more information about ADUs and interleaving,
//  see <http://www.live555.com/rtp-mp3/>)
Boolean useADUs = False;
Interleaving* interleaving = NULL;
#ifdef STREAM_USING_ADUS
useADUs = True;
#ifdef INTERLEAVE_ADUS
unsigned char interleaveCycle[] = {0,2,1,3}; // or choose your own...
unsigned const interleaveCycleSize
= (sizeof interleaveCycle)/(sizeof (unsigned char));
interleaving = new Interleaving(interleaveCycleSize, interleaveCycle);
#endif
#endif
sms->addSubsession(MP3AudioFileServerMediaSubsession::createNew(env, fileName, reuseSource, useADUs, interleaving));
} else if (strcmp(extension, ".mpg") == 0) {
// Assumed to be a MPEG-1 or 2 Program Stream (audio+video) file:
NEW_SMS("MPEG-1 or 2 Program Stream");
MPEG1or2FileServerDemux* demux
= MPEG1or2FileServerDemux::createNew(env, fileName, reuseSource);
sms->addSubsession(demux->newVideoServerMediaSubsession());
sms->addSubsession(demux->newAudioServerMediaSubsession());
} else if (strcmp(extension, ".vob") == 0) {
// Assumed to be a VOB (MPEG-2 Program Stream, with AC-3 audio) file:
NEW_SMS("VOB (MPEG-2 video with AC-3 audio)");
MPEG1or2FileServerDemux* demux
= MPEG1or2FileServerDemux::createNew(env, fileName, reuseSource);
sms->addSubsession(demux->newVideoServerMediaSubsession());
sms->addSubsession(demux->newAC3AudioServerMediaSubsession());
} else if (strcmp(extension, ".ts") == 0) {
// Assumed to be a MPEG Transport Stream file:
// Use an index file name that's the same as the TS file name, except with ".tsx":
unsigned indexFileNameLen = strlen(fileName) + 2; // allow for trailing "x\0"
char* indexFileName = new char[indexFileNameLen];
sprintf(indexFileName, "%sx", fileName);
NEW_SMS("MPEG Transport Stream");
sms->addSubsession(MPEG2TransportFileServerMediaSubsession::createNew(env, fileName, indexFileName, reuseSource));
delete[] indexFileName;
} else if (strcmp(extension, ".wav") == 0) {
// Assumed to be a WAV Audio file:
NEW_SMS("WAV Audio Stream");
// To convert 16-bit PCM data to 8-bit u-law, prior to streaming,
// change the following to True:
Boolean convertToULaw = False;
sms->addSubsession(WAVAudioFileServerMediaSubsession::createNew(env, fileName, reuseSource, convertToULaw));
} else if (strcmp(extension, ".dv") == 0) {
// Assumed to be a DV Video file
// First, make sure that the RTPSinks' buffers will be large enough to handle the huge size of DV frames (as big as 288000).
OutPacketBuffer::maxSize = 300000;

NEW_SMS("DV Video");
sms->addSubsession(DVVideoFileServerMediaSubsession::createNew(env, fileName, reuseSource));
} else if (strcmp(extension, ".mkv") == 0 || strcmp(extension, ".webm") == 0) {
// Assumed to be a Matroska file (note that WebM ('.webm') files are also Matroska files)
OutPacketBuffer::maxSize = 100000; // allow for some possibly large VP8 or VP9 frames
NEW_SMS("Matroska video+audio+(optional)subtitles");

// Create a Matroska file server demultiplexor for the specified file.
// (We enter the event loop to wait for this to complete.)
MatroskaDemuxCreationState creationState;
creationState.watchVariable = 0;
MatroskaFileServerDemux::createNew(env, fileName, onMatroskaDemuxCreation, &creationState);
env.taskScheduler().doEventLoop(&creationState.watchVariable);

ServerMediaSubsession* smss;
while ((smss = creationState.demux->newServerMediaSubsession()) != NULL) {
sms->addSubsession(smss);
}
} else if (strcmp(extension, ".ogg") == 0 || strcmp(extension, ".ogv") == 0 || strcmp(extension, ".opus") == 0) {
// Assumed to be an Ogg file
NEW_SMS("Ogg video and/or audio");

// Create a Ogg file server demultiplexor for the specified file.
// (We enter the event loop to wait for this to complete.)
OggDemuxCreationState creationState;
creationState.watchVariable = 0;
OggFileServerDemux::createNew(env, fileName, onOggDemuxCreation, &creationState);
env.taskScheduler().doEventLoop(&creationState.watchVariable);

ServerMediaSubsession* smss;
while ((smss = creationState.demux->newServerMediaSubsession()) != NULL) {
sms->addSubsession(smss);
}
}
// 增加新的媒体类型"live",此类型不是根据文件名的后缀识别,而是全名
else if (strcmp(extension, "live") == 0)
{
if (p_tsbuf != NULL)
{
NEW_SMS("MPEG2 Live Transport Stream");
sms->addSubsession(MPEG2TransportLiveServerMediaSubsession::createNew(env,reuseSource, p_tsbuf));
}
}

return sms;
}

四、主调函数的实现

1)在live555MediaServer.cpp的main函数基础上进行修改。

    增加一个头文件:live555MediaServer.h

#ifndef __TCB_LIVE555_MEDIA_SERVER_HH_
#define __TCB_LIVE555_MEDIA_SERVER_HH_

#include "LiveTSType.hh"

#ifdef __cplusplus
extern "C"
{
#endif

int live555ms_play(t_TS_Buffer *p_tsbuf);

#ifdef __cplusplus
}
#endif

#endif    
    修改live555MediaServer.cpp

#include <BasicUsageEnvironment.hh>
#include "DynamicRTSPServer.hh"
#include "version.hh"
#include <liveMedia.hh>

#include "live555MediaServer.h"
#include "LiveTSType.hh"
#include <zyl_debug.h>

#include <pthread.h>

#define ZYL_DEBUG_LEVEL 5

static int live555mediaserver(t_TS_Buffer *p_tsbuf)
{
// 设置使用环境
// 创建调度器
TaskScheduler* scheduler = BasicTaskScheduler::createNew();
// 创建交互环境
UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);

// 用户数据库指针
UserAuthenticationDatabase* authDB = NULL;
#ifdef ACCESS_CONTROL
// 创建用户数据库
authDB = new UserAuthenticationDatabase;
// 添加用户
authDB->addUserRecord("username1", "password1");
#endif

// 创建RTSP服务器,首先使用554端口号,如果不成功,则尝试使用8554
RTSPServer* rtspServer;
portNumBits rtspServerPortNum = 554;
rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB, p_tsbuf);
if (rtspServer == NULL)
{
rtspServerPortNum = 8554;
rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB, p_tsbuf);
}
if (rtspServer == NULL)
{
*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
exit(1);
}

*env << "TCB LIVE555 Media Server\n";
*env << "\tversion " << MEDIA_SERVER_VERSION_STRING
<< " (LIVE555 Streaming Media library version "
<< LIVEMEDIA_LIBRARY_VERSION_STRING << ").\n";

char* urlPrefix = rtspServer->rtspURLPrefix();
*env << "Play streams from this server using the URL\n\t"
<< urlPrefix << "<filename>\nwhere <filename> is a file present in the current directory.\n";
*env << "Each file's type is inferred from its name suffix:\n";
*env << "\t\".264\" => a H.264 Video Elementary Stream file\n";
*env << "\t\".265\" => a H.265 Video Elementary Stream file\n";
*env << "\t\".aac\" => an AAC Audio (ADTS format) file\n";
*env << "\t\".ac3\" => an AC-3 Audio file\n";
*env << "\t\".amr\" => an AMR Audio file\n";
*env << "\t\".dv\" => a DV Video file\n";
*env << "\t\".m4e\" => a MPEG-4 Video Elementary Stream file\n";
*env << "\t\".mkv\" => a Matroska audio+video+(optional)subtitles file\n";
*env << "\t\".mp3\" => a MPEG-1 or 2 Audio file\n";
*env << "\t\".mpg\" => a MPEG-1 or 2 Program Stream (audio+video) file\n";
*env << "\t\".ogg\" or \".ogv\" or \".opus\" => an Ogg audio and/or video file\n";
*env << "\t\".ts\" => a MPEG Transport Stream file\n";
*env << "\t\t(a \".tsx\" index file - if present - provides server 'trick play' support)\n";
*env << "\t\".vob\" => a VOB (MPEG-2 video with AC-3 audio) file\n";
*env << "\t\".wav\" => a WAV Audio file\n";
*env << "\t\"live\" => a live ts profram\n";
*env << "\t\".webm\" => a WebM audio(Vorbis)+video(VP8) file\n";
*env << "See http://www.live555.com/mediaServer/ for additional documentation.\n";

// Also, attempt to create a HTTP server for RTSP-over-HTTP tunneling.
// Try first with the default HTTP port (80), and then with the alternative HTTP
// port numbers (8000 and 8080).

if (rtspServer->setUpTunnelingOverHTTP(80) || rtspServer->setUpTunnelingOverHTTP(8000) || rtspServer->setUpTunnelingOverHTTP(8080))
{
*env << "(We use port " << rtspServer->httpServerPortNum() << " for optional RTSP-over-HTTP tunneling, or for HTTP live streaming (for indexed Transport Stream files only).)\n";
}
else
{
*env << "(RTSP-over-HTTP tunneling is not available.)\n";
}

env->taskScheduler().doEventLoop(); // does not return

return 0; // only to prevent compiler warning
}
// live555主函数
int live555ms_play(t_TS_Buffer *p_tsbuf)
{
printf("=== zyl ===, %s, %d\n",__FILE__, __LINE__);
//return live_ts_rtsp_server(p_tsbuf);
return live555mediaserver(p_tsbuf);
}

五、函数调用
    以上是对整个live555部分的修改,在调用的时候,使用live555ms_play(t_TS_Buffer *p_tsbuf)
传递存放数据的指针,当然,需要为Live555启动一个线程,因为这是一个不会返回的函数。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  live555 TS 实时