【Netty4 简单项目实践】十一、用Netty分发mpegts到websocket接口
2017-12-18 15:30
901 查看
【前言】
推视频流的时候,rtmp会有3秒的延迟。目前有一种解决方案是用mpegts的格式解决。如果考虑用ffmpeg来推流的话,可以使用http格式和udp格式来推流。现在要做的事情是用Netty来转发rtmp到websocket接口上,然后用H5来播放。播放的插件使用jsmpeg这个项目来实现。
【ffmpeg推mpegts】
ffmpeg推流支持http和udp两种协议,目前还不支持websocket的方式。所以就打算用Netty做协议转发。假定本地接收流地址是 http://localhost:9090 在Mac上推屏幕上的画面可以用下面的命令
ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -b 0
http://localhost:9090
如果是推UDP的话,假定也是推到localhost,端口9094,可以使用下面的命令
ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -b 0 udp://localhost:9094
上面这两种方式都没有加入音频编码,如果要包含音频的话,需要指定音频方式
ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -acodec libfaac -b 0 udp://localhost:9094
【尝试HTTP推流】
按jsmpeg的教程,他把ffmpeg流推给nginx,让nginx转发到websocket上,而且并没有修改nginx的模块,所以我想如果把ffmpeg推的数据buff直接转发给websocket应该是可行的。至少http是可行的。那么下来要做的事情就是把Http的报文去掉头,只转发response body就可以了。真的是这样么?
我用Netty建了一个bootstrap,包含监听的EventLoopGroup和传输的EventLoopGroup,就是典型的一个服务bootstrap类型
ServerBootstrap,然后配置TCP模式设置一下TCP的nodelay,收发缓冲区大小等参数
之后没有加载任何的编解码器,直接把处理Handler加上去。
这个处理Handler也超级简单,就是把消息分发到ws的连接组里就完事了。注意Handler用的是ByteBuf,并不需要解析成Http协议。
【UDP推流处理】
UDP方式的server要用NioDatagramChannel,也不需要编解码模块,直接配上处理Handler
Handler也超级简单,唯一需要注意的是Handler接收的数据类型不再是ByteBuf,而是DatagramPacket
【websocket】
websocket阶段和之前写的ws稍微一点不一样。这里是用ws传输二进制,所以ws的数据格式是BinaryWebSocketFrame。Server加载的编解码器看起来像这样:
WebSocketFrameDecoder是自定义的解码器。WebSocketFramePrepender是自定义的编码器。VideoPlayerHandler是自定义处理Handler。
先看解码器WebSocketFrameDecoder,这里我用直接内存了。
这样就通过ChannelGroup打通了UDP到ws的通道。
【结束语】
UDP的效果比较好,推HTTP的时候经常花屏,最后附带说一下jsmpeg的选项,如果播放效果不好,需要调整jsmpeg的选项。
注意config里面的protocols,记得和ws的server里面配置一致
推视频流的时候,rtmp会有3秒的延迟。目前有一种解决方案是用mpegts的格式解决。如果考虑用ffmpeg来推流的话,可以使用http格式和udp格式来推流。现在要做的事情是用Netty来转发rtmp到websocket接口上,然后用H5来播放。播放的插件使用jsmpeg这个项目来实现。
【ffmpeg推mpegts】
ffmpeg推流支持http和udp两种协议,目前还不支持websocket的方式。所以就打算用Netty做协议转发。假定本地接收流地址是 http://localhost:9090 在Mac上推屏幕上的画面可以用下面的命令
ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -b 0
http://localhost:9090
如果是推UDP的话,假定也是推到localhost,端口9094,可以使用下面的命令
ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -b 0 udp://localhost:9094
上面这两种方式都没有加入音频编码,如果要包含音频的话,需要指定音频方式
ffmpeg -f avfoundation -i "1" -vcodec libx264 -f mpegts -codec:v mpeg1video -acodec libfaac -b 0 udp://localhost:9094
【尝试HTTP推流】
按jsmpeg的教程,他把ffmpeg流推给nginx,让nginx转发到websocket上,而且并没有修改nginx的模块,所以我想如果把ffmpeg推的数据buff直接转发给websocket应该是可行的。至少http是可行的。那么下来要做的事情就是把Http的报文去掉头,只转发response body就可以了。真的是这样么?
我用Netty建了一个bootstrap,包含监听的EventLoopGroup和传输的EventLoopGroup,就是典型的一个服务bootstrap类型
ServerBootstrap,然后配置TCP模式设置一下TCP的nodelay,收发缓冲区大小等参数
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_SNDBUF, 1024*256) .option(ChannelOption.SO_RCVBUF, 1024*256) .option(ChannelOption.TCP_NODELAY, true);
之后没有加载任何的编解码器,直接把处理Handler加上去。
bootstrap.group(bossLoop, workerLoop) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("logging", new LoggingHandler(LogLevel.DEBUG)); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new MpegTsHandler()); } });
这个处理Handler也超级简单,就是把消息分发到ws的连接组里就完事了。注意Handler用的是ByteBuf,并不需要解析成Http协议。
public class MpegTsHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // 转发字节流 PlayerGroup.broadCast(msg); } }而其中的PlayerGroup,是一个channelGroup。注意其中在广播的时候retain了一下。
public class PlayerGroup { static private ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); static public void addChannel(Channel channel) { channelGroup.add(channel); } static public void removeChannel(Channel channel) { channelGroup.remove(channel); } static public void broadCast(ByteBuf message) { if (channelGroup == null || channelGroup.isEmpty()) { return; } BinaryWebSocketFrame frame = new BinaryWebSocketFrame(message); message.retain(); channelGroup.writeAndFlush(frame); } static public void destory() { if (channelGroup == null || channelGroup.isEmpty()) { return; } channelGroup.close(); } }这样做是可以播放的。只是里面含有很多HTTP头部字节。至于把HTTP头部都去掉能不能行,没有尝试。因为无论转发时是否去掉,在接收的时候同样是多了很多的开销,所以后面我转到UDP方式的推流。
【UDP推流处理】
UDP方式的server要用NioDatagramChannel,也不需要编解码模块,直接配上处理Handler
EventLoopGroup bossLoop = null; try { bossLoop = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioDatagramChannel.class); bootstrap .group(bossLoop) .option(ChannelOption.SO_BROADCAST, true) // 支持广播 .option(ChannelOption.SO_SNDBUF, 1024 * 256) .option(ChannelOption.SO_RCVBUF, 1024 * 256); bootstrap .handler(new UdpMpegTsHandler()); ChannelFuture future = bootstrap.bind(port).sync(); if (future.isSuccess()) { System.out.println("UDP stream server start at port: " + port + "."); } future.channel().closeFuture().await(); } catch (Exception e) { } finally { if (bossLoop != null) { bossLoop.shutdownGracefully(); } }
Handler也超级简单,唯一需要注意的是Handler接收的数据类型不再是ByteBuf,而是DatagramPacket
public class UdpMpegTsHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { PlayerGroup.broadCast(msg.content()); } }同样的是转发到group里面去广播。唯一需要注意的是要取出DatagramPacket中的ByteBuf
【websocket】
websocket阶段和之前写的ws稍微一点不一样。这里是用ws传输二进制,所以ws的数据格式是BinaryWebSocketFrame。Server加载的编解码器看起来像这样:
bootstrap.localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("readTimeout", new ReadTimeoutHandler(45)); // 长时间不写会断 ch.pipeline().addLast("HttpServerCodec", new HttpServerCodec()); ch.pipeline().addLast("ChunkedWriter", new ChunkedWriteHandler()); ch.pipeline().addLast("HttpAggregator", new HttpObjectAggregator(65535)); ch.pipeline().addLast("WsProtocolHandler", new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "haofei", true)); ch.pipeline().addLast("WsBinaryDecoder", new WebSocketFrameDecoder()); // ws解码成字节 ch.pipeline().addLast("WsEncoder", new WebSocketFramePrepender()); // 字节编码成ws ch.pipeline().addLast(new VideoPlayerHandler()); } });
WebSocketFrameDecoder是自定义的解码器。WebSocketFramePrepender是自定义的编码器。VideoPlayerHandler是自定义处理Handler。
先看解码器WebSocketFrameDecoder,这里我用直接内存了。
public class WebSocketFrameDecoder extends MessageToMessageDecoder<WebSocketFrame> { @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception { ByteBuf buff = msg.content(); byte[] messageBytes = new byte[buff.readableBytes()]; buff.readBytes(messageBytes); // TODO: 直接内存小心 ByteBuf bytebuf = PooledByteBufAllocator.DEFAULT.buffer(); // 直接内存 bytebuf.writeBytes(messageBytes); out.add(bytebuf.retain()); } }再看编码器,按之前说的编码成二进制流,而不是字符流
public class WebSocketFramePrepender extends MessageToMessageEncoder<ByteBuf> { @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { WebSocketFrame webSocketFrame = new BinaryWebSocketFrame(msg); out.add(webSocketFrame); } }最后的Handler也超级简单,就是把channel加到group里面
public class VideoPlayerHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ServerLogger.log("ws 连接 ctx:" + ctx); PlayerGroup.addChannel(ctx.channel()); } }
这样就通过ChannelGroup打通了UDP到ws的通道。
【结束语】
UDP的效果比较好,推HTTP的时候经常花屏,最后附带说一下jsmpeg的选项,如果播放效果不好,需要调整jsmpeg的选项。
<!DOCTYPE html> <html> <head> <title>JSMpeg Stream Client</title> <style type="text/css"> html, body { background-color: #111; text-align: center; } </style> </head> <body> <canvas id="video-canvas"></canvas> <script type="text/javascript" src="js/jsmpeg.min.js"></script> <script type="text/javascript"> var canvas = document.getElementById('video-canvas'); var url = 'ws://'+'127.0.0.1:9092/wsEntry'; var config = { canvas: canvas, autoplay: true, audio: false, video: true, protocols: 'haofei', }; var player = new JSMpeg.Player(url, config); </script> </body> </html>
注意config里面的protocols,记得和ws的server里面配置一致
ch.pipeline().addLast("WsProtocolHandler", new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "haofei", true));这个子协议可以用来做鉴权神马的,不过记得不能填"" 即空字符串,因为在Netty里不把空字符串当成子协议。这里不知道算不算Netty的bug。
相关文章推荐
- 【Netty4 简单项目实践】十四、用SpringBoot加载Netty
- 【Netty4 简单项目实践】七、上线前准备--内存泄漏监控、系统监控
- 【Netty4 简单项目实践】二、解决TCP连包问题:分隔符解码器
- 【Netty4 简单项目实践】六、断掉未鉴权的TCP长连接--ChannelHandelContext中的定时器用法
- 【Netty4 简单项目实践】十二、客户端连接池-压测利器
- 【Netty4 简单项目实践】五、Netty4接收HTTP文件上传
- 【Netty4 简单项目实践】十、Http协议下使用protocol buff
- 【Netty4 简单项目实践】三、压缩消息体:使用google的protocol buff
- 【Netty4 简单项目实践】八、转发服务的实现方案
- 【Netty4 简单项目实践】十三、WebSocket Over ProtocolBuf
- 【Netty4 简单项目实践】一、长连接服务通用框架原型
- 【Netty4 简单项目实践】九、示例项目
- 【Netty4 简单项目实践】四、添加Log工具-SL4J
- 一个上线项目实践的简单易用的RecyclerView通用全能型适配器。
- ssh批量管理分发项目实战介绍与实践
- 初识Android studio与简单项目实践
- 实践项目十:爬取百度百科Python词条相关1000个页面数据(慕课简单爬虫实战)
- c语言项目实践---设计简单银行登录、存取款、改密系统
- 第七周上机实践项目:时钟类简单程序
- demo项目开发笔录(简单分页实践)