您的位置:首页 > 其它

【Netty4 简单项目实践】十一、用Netty分发mpegts到websocket接口

2017-12-18 15:30 901 查看
【前言】

推视频流的时候,rtmp会有3秒的延迟。目前有一种解决方案是用mpegts的格式解决。如果考虑用ffmpeg来推流的话,可以使用http格式和udp格式来推流。现在要做的事情是用Netty来转发rtmp到websocket接口上,然后用H5来播放。播放的插件使用jsmpeg这个项目来实现。

【ffmpeg推mpegts】

ffmpeg推流支持http和udp两种协议,目前还不支持websocket的方式。所以就打算用Netty做协议转发。假定本地接收流地址是 http://localhost:9090 在Mac上推屏幕上的画面可以用下面的命令

ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -b 0
http://localhost:9090

如果是推UDP的话,假定也是推到localhost,端口9094,可以使用下面的命令

ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -b 0 udp://localhost:9094

上面这两种方式都没有加入音频编码,如果要包含音频的话,需要指定音频方式

ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -acodec libfaac -b 0 udp://localhost:9094

【尝试HTTP推流】
按jsmpeg的教程,他把ffmpeg流推给nginx,让nginx转发到websocket上,而且并没有修改nginx的模块,所以我想如果把ffmpeg推的数据buff直接转发给websocket应该是可行的。至少http是可行的。那么下来要做的事情就是把Http的报文去掉头,只转发response body就可以了。真的是这样么?

我用Netty建了一个bootstrap,包含监听的EventLoopGroup和传输的EventLoopGroup,就是典型的一个服务bootstrap类型
ServerBootstrap,然后配置TCP模式设置一下TCP的nodelay,收发缓冲区大小等参数

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_SNDBUF, 1024*256)
.option(ChannelOption.SO_RCVBUF, 1024*256)
.option(ChannelOption.TCP_NODELAY, true);

之后没有加载任何的编解码器,直接把处理Handler加上去。

bootstrap.group(bossLoop, workerLoop)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("logging", new LoggingHandler(LogLevel.DEBUG));
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new MpegTsHandler());
}
});

这个处理Handler也超级简单,就是把消息分发到ws的连接组里就完事了。注意Handler用的是ByteBuf,并不需要解析成Http协议。

public class MpegTsHandler extends SimpleChannelInboundHandler<ByteBuf>{

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// 转发字节流
PlayerGroup.broadCast(msg);
}
}
而其中的PlayerGroup,是一个channelGroup。注意其中在广播的时候retain了一下。

public class PlayerGroup {
static private ChannelGroup channelGroup
= new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

static public void addChannel(Channel channel) {
channelGroup.add(channel);
}

static public void removeChannel(Channel channel) {
channelGroup.remove(channel);
}

static public void broadCast(ByteBuf message) {
if (channelGroup == null || channelGroup.isEmpty()) {
return;
}
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(message);
message.retain();

channelGroup.writeAndFlush(frame);
}

static public void destory() {
if (channelGroup == null || channelGroup.isEmpty()) {
return;
}
channelGroup.close();
}
}
这样做是可以播放的。只是里面含有很多HTTP头部字节。至于把HTTP头部都去掉能不能行,没有尝试。因为无论转发时是否去掉,在接收的时候同样是多了很多的开销,所以后面我转到UDP方式的推流。

【UDP推流处理】
UDP方式的server要用NioDatagramChannel,也不需要编解码模块,直接配上处理Handler

EventLoopGroup bossLoop = null;
try {
bossLoop = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);

bootstrap
.group(bossLoop)
.option(ChannelOption.SO_BROADCAST, true) // 支持广播
.option(ChannelOption.SO_SNDBUF, 1024 * 256)
.option(ChannelOption.SO_RCVBUF, 1024 * 256);

bootstrap
.handler(new UdpMpegTsHandler());

ChannelFuture future = bootstrap.bind(port).sync();

if (future.isSuccess()) {
System.out.println("UDP stream server start at port: " + port + ".");
}
future.channel().closeFuture().await();
} catch (Exception e) {
} finally {
if (bossLoop != null) {
bossLoop.shutdownGracefully();
}
}


Handler也超级简单,唯一需要注意的是Handler接收的数据类型不再是ByteBuf,而是DatagramPacket

public class UdpMpegTsHandler extends SimpleChannelInboundHandler<DatagramPacket> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
PlayerGroup.broadCast(msg.content());
}
}
同样的是转发到group里面去广播。唯一需要注意的是要取出DatagramPacket中的ByteBuf

【websocket】
websocket阶段和之前写的ws稍微一点不一样。这里是用ws传输二进制,所以ws的数据格式是BinaryWebSocketFrame。Server加载的编解码器看起来像这样:

bootstrap.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("readTimeout", new ReadTimeoutHandler(45)); // 长时间不写会断
ch.pipeline().addLast("HttpServerCodec", new HttpServerCodec());
ch.pipeline().addLast("ChunkedWriter", new ChunkedWriteHandler());
ch.pipeline().addLast("HttpAggregator", new HttpObjectAggregator(65535));
ch.pipeline().addLast("WsProtocolHandler",
new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "haofei", true));
ch.pipeline().addLast("WsBinaryDecoder", new WebSocketFrameDecoder()); // ws解码成字节
ch.pipeline().addLast("WsEncoder", new WebSocketFramePrepender()); // 字节编码成ws
ch.pipeline().addLast(new VideoPlayerHandler());
}
});


WebSocketFrameDecoder是自定义的解码器。WebSocketFramePrepender是自定义的编码器。VideoPlayerHandler是自定义处理Handler。
先看解码器WebSocketFrameDecoder,这里我用直接内存了。

public class WebSocketFrameDecoder extends MessageToMessageDecoder<WebSocketFrame> {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
ByteBuf buff = msg.content();
byte[] messageBytes = new byte[buff.readableBytes()];
buff.readBytes(messageBytes);
// TODO: 直接内存小心
ByteBuf bytebuf = PooledByteBufAllocator.DEFAULT.buffer(); // 直接内存
bytebuf.writeBytes(messageBytes);
out.add(bytebuf.retain());
}
}
再看编码器,按之前说的编码成二进制流,而不是字符流

public class WebSocketFramePrepender extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
WebSocketFrame webSocketFrame = new BinaryWebSocketFrame(msg);
out.add(webSocketFrame);
}
}
最后的Handler也超级简单,就是把channel加到group里面

public class VideoPlayerHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ServerLogger.log("ws 连接 ctx:" + ctx);
PlayerGroup.addChannel(ctx.channel());
}
}


这样就通过ChannelGroup打通了UDP到ws的通道。

【结束语】
UDP的效果比较好,推HTTP的时候经常花屏,最后附带说一下jsmpeg的选项,如果播放效果不好,需要调整jsmpeg的选项。

<!DOCTYPE html>
<html>
<head>
<title>JSMpeg Stream Client</title>
<style type="text/css">
html, body {
background-color: #111;
text-align: center;
}
</style>

</head>
<body>
<canvas id="video-canvas"></canvas>
<script type="text/javascript" src="js/jsmpeg.min.js"></script>
<script type="text/javascript">
var canvas = document.getElementById('video-canvas');
var url = 'ws://'+'127.0.0.1:9092/wsEntry';
var config = {
canvas: canvas,
autoplay: true,
audio: false,
video: true,
protocols: 'haofei',
};
var player = new JSMpeg.Player(url, config);
</script>
</body>
</html>


注意config里面的protocols,记得和ws的server里面配置一致

ch.pipeline().addLast("WsProtocolHandler",
new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "haofei", true));
这个子协议可以用来做鉴权神马的,不过记得不能填"" 即空字符串,因为在Netty里不把空字符串当成子协议。这里不知道算不算Netty的bug。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  netty 视频流