LocalChannel in Netty
2017-01-25 14:28
99 查看
缘起
在工作中,有一个feature用到了netty的LocalChannel的功能,之前由于时间太紧没有机会总结下这个LocalChannel,今天有时间就过来写一下。LocalChannel介绍
LcoalChannel是Netty提供的用来在同一个JVM内部实现client和server之间通信的transport。它的实现主要是通过内存里的对象作为通信介质,不会像NIO下的channel,会占用一个文件描述符;因此使用它不会影响到你系统上的打开文件数,也就不会影响到你系统所能管理的连接数了。对于在同一个JVM内部使用netty的机制进行通信的话,还是很轻量级的。示例
工作中的使用还是比较负载的,我在这里写了一个简单的例子,下面的讨论都是参照这个例子的。LocalServer代码示例
LocalClient代码示例
可以看到使用方式跟使用其他的channel没有什么区别,需要注意的就是,LocalChannel也实现了自己的EventLoop,所以在使用的时候还要是配套的使用相对应的LocalEventLoopGroup。
内部实现
LocalServerChannel的bind
首先要看的当然就是server启动之后是怎么监听端口的,从示例代码中可以看出,使用LocalChannel的Server端所bind的地址只是一个字符串,并不是ip+port的形式,这也说明了LocalChannel的东西是不会占用你的端口的。当你初始化了ServerBootStrap之后就要把它绑定到这个字符串所代表的“端口”上来,那么它是怎么做的呢?我这里抓了一下它的调用栈,我觉得它已经可以说明一切了。
"localEventLoopGroup-2-1@1385" prio=10 tid=0xd nid=NA runnable java.lang.Thread.State: RUNNABLE at io.netty.channel.local.LocalChannelRegistry.register(LocalChannelRegistry.java:32) at io.netty.channel.local.LocalServerChannel.doBind(LocalServerChannel.java:91) at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485) at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1081) at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:502) at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:487) at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:904) at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198) at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348) at io.netty.channel.local.LocalEventLoop.run(LocalEventLoop.java:33) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745)
整个很简单,最后的bind操作落到了
io.netty.channel.local.LocalChannelRegistry.register。这是一个全局的注册表,它做的事情主要就是下面这个。而
boundChannels就是一个
ConcurrentMap。
final class LocalChannelRegistry { static LocalAddress register( Channel channel, LocalAddress oldLocalAddress, SocketAddress localAddress) { ..... Channel boundChannel = boundChannels.putIfAbsent(addr, channel); ..... } }
好了,bind既然就是往全局的注册表里注册一下,把自己监听的地址跟自己的channel放进去。简单吧。
LocalClient的connect
同样的,我把LocalClient的调用栈也抓了出来。"Thread-1@1364" prio=5 tid=0x16 nid=NA runnable java.lang.Thread.State: RUNNABLE at io.netty.bootstrap.Bootstrap.doConnect0(Bootstrap.java:160) at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:141) at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:115) at com.nettytest.AlexLocalClient.start(AlexLocalClient.java:32) at com.nettytest.Demo.startLocalClient(Demo.java:22) at com.nettytest.Demo.access$100(Demo.java:8) at com.nettytest.Demo$2.run(Demo.java:40) at java.lang.Thread.run(Thread.java:745)
"localEventLoopGroup-3-1@1398" prio=10 tid=0x18 nid=NA runnable java.lang.Thread.State: RUNNABLE at io.netty.channel.local.LocalChannel$LocalUnsafe.connect(LocalChannel.java:337) at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1089) at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:543) at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:528) at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:510) at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:909) at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:203) at io.netty.bootstrap.Bootstrap$2.run(Bootstrap.java:165) at io.netty.channel.local.LocalEventLoop.run(LocalEventLoop.java:33) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745)
private class LocalUnsafe extends AbstractUnsafe { @Override public void connect(final SocketAddress remoteAddress, SocketAddress localAddress, final ChannelPromise promise) { .................... if (localAddress != null) { try { doBind(localAddress); // ------- #! } catch (Throwable t) { safeSetFailure(promise, t); close(voidPromise()); return; } } Channel boundChannel = LocalChannelRegistry.get(remoteAddress); //---- #2 if (!(boundChannel instanceof LocalServerChannel)) { Exception cause = new ChannelException("connection refused"); safeSetFailure(promise, cause); close(voidPromise()); return; } LocalServerChannel serverChannel = (LocalServerChannel) boundChannel; peer = serverChannel.serve(LocalChannel.this); //---- #3 } }
跟LocalServer一样,我们看调用栈最上面的这个函数,它的代码就像上面这个。前面有一些状态检查的条件,我们先暂时不关心。
注释1处的意思是,我要将LocalClient的channel注册到全局的注册表中,这个主要是为了方便连接成功后,server同样可以找到client来发送数据。这里需要注意的是,实际上具体到某一次数据交换来看,server跟client的角色是不停的在互换的,也就是发送者跟接受者的角色是不停的互换的。为了实现这种双向的通信,通信的双方都在注册表面进行注册也是必须的。
在注释2处,client从注册表中取出server的channel,然后在注释3处将serverChannel跟clientChannel联系起来。下面我们就来看这个server.serve()。
public class LocalServerChannel extends AbstractServerChannel { ...... LocalChannel serve(final LocalChannel peer) { final LocalChannel child = new LocalChannel(this, peer); if (eventLoop().inEventLoop()) { serve0(child); } else { eventLoop().execute(new Runnable() { @Override public void run() { serve0(child); } }); } return child; } ......
通过创建一个新的LocalChannel来讲自己跟peer关联起来,这里的peer就是client端的channel。然后把这个新创建的channel返回给client端,作为client端的peer。这样client的读写都是通过这个channel来进行了。
需要注意的是,这里是新建的LocalChannel跟原来的那个server bind返回的那个channel不是同一个。这个也是为了同一个server channel只是用来监听,accept后会产生很多的channel,同时服务它们。
LocalChannel的write
对于write,我们就从LocalClient分析,实际上在LocalServer那边的写跟这个也是类似的。参考代码
同样的,抓了调用栈出来
"localEventLoopGroup-3-1@1373" prio=10 tid=0x18 nid=NA runnable java.lang.Thread.State: RUNNABLE at io.netty.channel.local.LocalChannel.doWrite(LocalChannel.java:278) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:750) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:719) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1119) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:735) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:765) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:753) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:788) at com.nettytest.AlexLocalClientHandler.channelActive(AlexLocalClientHandler.java:13) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:212) at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198) at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:818) at io.netty.channel.local.LocalChannel$3.run(LocalChannel.java:178) at io.netty.channel.local.LocalEventLoop.run(LocalEventLoop.java:33) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745)
同样的,把焦点放在栈顶的这个函数
public class LocalChannel extends AbstractChannel { ...... @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { if (state < 2) { throw new NotYetConnectedException(); } if (state > 2) { throw new ClosedChannelException(); } final LocalChannel peer = this.peer; final ChannelPipeline peerPipeline = peer.pipeline(); final EventLoop peerLoop = peer.eventLoop(); if (peerLoop == eventLoop()) { for (;;) { Object msg = in.current(); if (msg == null) { break; } peer.inboundBuffer.add(msg); ReferenceCountUtil.retain(msg); in.remove(); } finishPeerRead(peer, peerPipeline); } else { // Use a copy because the original msgs will be recycled by AbstractChannel. final Object[] msgsCopy = new Object[in.size()]; for (int i = 0; i < msgsCopy.length; i ++) { msgsCopy[i] = ReferenceCountUtil.retain(in.current()); in.remove(); } peerLoop.execute(new Runnable() { @Override public void run() { Collections.addAll(peer.inboundBuffer, msgsCopy); finishPeerRead(peer, peerPipeline); } }); } } ..... }
整个函数很简单,主要就是那个if…else条件。
如果peer跟我当前的使用的是同一个eventloop,那么我就直接放到它的inboundBuffer里面
如果不是同一个eventloop,我就要把要写过去的buffer拷贝一份,通过异步任务的方式放入peerLoop。为什么这里需要拷贝ByteBuf呢,就是因为要放入eventLoop异步执行嘛,原来的ByteBuf很可能在这个异步任务执行前就被释放掉了(GC掉了)。
需要注意的地方
从上面的分析来看,LocalChannel是依赖于这个中央的注册表的,而这个注册表又是以channel的id作为key的,那么这个channel id的唯一性就非常重要,如果local client的数目非常多,那么就有可能发生channel id冲突的情况,导致你的channel 在注册表中注册失败,继而导致connect失败或者是bind失败。其实netty的作者也注意到了这个问题,给出了解决方案。注意到这个PR的target版本号是5.0.0.Alpha1,而作者另外的一个backporttarget是4.1.0.Beta1。
如果你需要在你的代码中使用LocalChannel,注意一下这两个PR还是有必要。但是如果你真的不太方便升级你的netty版本的话,解决方案就是在connect或者bind的地方加入一定的重试机制,当然需要怎么样的重试还要根据你的业务场景来看啦 :)
相关文章推荐
- netty ChannelInboundByteHandlerAdapter
- Netty in action—ChannelHandler和ChannelPipeline
- netty ChannelInboundHandlerAdapter 使用注意事项
- Netty 源码分析之SimpleChannelInboundHandler
- netty之SimpleChannelInboundHandler
- Netty 的 inbound 与 outbound, 以及 InboundHandler 的 channelInactive 与 OutboundHandler 的 close
- Netty 的 inbound 与 outbound, 以及 InboundHandler 的 channelInactive 与 OutboundHandler 的 close
- Netty in Action (十六) 第六章节 第二部分 ChannelHandlerContext和异常处理
- Netty之SimpleChannelInboundHandler
- 关于Netty in active中第二章创建的服务端Handler不继承SimpleChannelInboundHandler的原因
- 一起学Netty(三)之 SimpleChannelInboundHandler
- Netty中的SimpleChannelInboundHandler的通信过程
- 《Netty in Action》chapter 6 : ChannelHandler and ChannelPipeline
- 一起学Netty(三)之 SimpleChannelInboundHandler
- java netty之ChannelInboundByteHandlerAdapter
- netty的ChannelInboundHandler接口
- 一起学Netty(三)之 SimpleChannelInboundHandler
- Netty3 源码分析 - ChannelStateEvent
- Play-akka-spark 错误 p.nettyException - Exception caught in Netty
- Maven was cached in the local repository, resolution will not be reattempted until the update interv