您的位置:首页 > 其它

LocalChannel in Netty

2017-01-25 14:28 99 查看

缘起

在工作中,有一个feature用到了netty的LocalChannel的功能,之前由于时间太紧没有机会总结下这个LocalChannel,今天有时间就过来写一下。

LocalChannel介绍

LcoalChannel是Netty提供的用来在同一个JVM内部实现client和server之间通信的transport。它的实现主要是通过内存里的对象作为通信介质,不会像NIO下的channel,会占用一个文件描述符;因此使用它不会影响到你系统上的打开文件数,也就不会影响到你系统所能管理的连接数了。对于在同一个JVM内部使用netty的机制进行通信的话,还是很轻量级的。

示例

工作中的使用还是比较负载的,我在这里写了一个简单的例子,下面的讨论都是参照这个例子的。

LocalServer代码示例

LocalClient代码示例

可以看到使用方式跟使用其他的channel没有什么区别,需要注意的就是,LocalChannel也实现了自己的EventLoop,所以在使用的时候还要是配套的使用相对应的LocalEventLoopGroup。

内部实现

LocalServerChannel的bind

首先要看的当然就是server启动之后是怎么监听端口的,从示例代码中可以看出,使用LocalChannel的Server端所bind的地址只是一个字符串,并不是ip+port的形式,这也说明了LocalChannel的东西是不会占用你的端口的。

当你初始化了ServerBootStrap之后就要把它绑定到这个字符串所代表的“端口”上来,那么它是怎么做的呢?我这里抓了一下它的调用栈,我觉得它已经可以说明一切了。

"localEventLoopGroup-2-1@1385" prio=10 tid=0xd nid=NA runnable
java.lang.Thread.State: RUNNABLE
at io.netty.channel.local.LocalChannelRegistry.register(LocalChannelRegistry.java:32)
at io.netty.channel.local.LocalServerChannel.doBind(LocalServerChannel.java:91)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1081)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:502)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:487)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:904)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198)
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348)
at io.netty.channel.local.LocalEventLoop.run(LocalEventLoop.java:33)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)


整个很简单,最后的bind操作落到了
io.netty.channel.local.LocalChannelRegistry.register
。这是一个全局的注册表,它做的事情主要就是下面这个。而
boundChannels
就是一个
ConcurrentMap


final class LocalChannelRegistry {
static LocalAddress register(
Channel channel, LocalAddress oldLocalAddress, SocketAddress localAddress) {
.....
Channel boundChannel = boundChannels.putIfAbsent(addr, channel);
.....
}
}


好了,bind既然就是往全局的注册表里注册一下,把自己监听的地址跟自己的channel放进去。简单吧。

LocalClient的connect

同样的,我把LocalClient的调用栈也抓了出来。

"Thread-1@1364" prio=5 tid=0x16 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at io.netty.bootstrap.Bootstrap.doConnect0(Bootstrap.java:160)
at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:141)
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:115)
at com.nettytest.AlexLocalClient.start(AlexLocalClient.java:32)
at com.nettytest.Demo.startLocalClient(Demo.java:22)
at com.nettytest.Demo.access$100(Demo.java:8)
at com.nettytest.Demo$2.run(Demo.java:40)
at java.lang.Thread.run(Thread.java:745)


"localEventLoopGroup-3-1@1398" prio=10 tid=0x18 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at io.netty.channel.local.LocalChannel$LocalUnsafe.connect(LocalChannel.java:337)
at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1089)
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:543)
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:528)
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:510)
at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:909)
at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:203)
at io.netty.bootstrap.Bootstrap$2.run(Bootstrap.java:165)
at io.netty.channel.local.LocalEventLoop.run(LocalEventLoop.java:33)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)


private class LocalUnsafe extends AbstractUnsafe {

@Override
public void connect(final SocketAddress remoteAddress,
SocketAddress localAddress, final ChannelPromise promise) {
....................

if (localAddress != null) {
try {
doBind(localAddress);     // ------- #!
} catch (Throwable t) {
safeSetFailure(promise, t);
close(voidPromise());
return;
}
}

Channel boundChannel = LocalChannelRegistry.get(remoteAddress);  //---- #2
if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause = new ChannelException("connection refused");
safeSetFailure(promise, cause);
close(voidPromise());
return;
}

LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
peer = serverChannel.serve(LocalChannel.this);  //---- #3
}
}


跟LocalServer一样,我们看调用栈最上面的这个函数,它的代码就像上面这个。前面有一些状态检查的条件,我们先暂时不关心。

注释1处的意思是,我要将LocalClient的channel注册到全局的注册表中,这个主要是为了方便连接成功后,server同样可以找到client来发送数据。这里需要注意的是,实际上具体到某一次数据交换来看,server跟client的角色是不停的在互换的,也就是发送者跟接受者的角色是不停的互换的。为了实现这种双向的通信,通信的双方都在注册表面进行注册也是必须的。

在注释2处,client从注册表中取出server的channel,然后在注释3处将serverChannel跟clientChannel联系起来。下面我们就来看这个server.serve()。

public class LocalServerChannel extends AbstractServerChannel {
......
LocalChannel serve(final LocalChannel peer) {
final LocalChannel child = new LocalChannel(this, peer);
if (eventLoop().inEventLoop()) {
serve0(child);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
serve0(child);
}
});
}
return child;
}
......


通过创建一个新的LocalChannel来讲自己跟peer关联起来,这里的peer就是client端的channel。然后把这个新创建的channel返回给client端,作为client端的peer。这样client的读写都是通过这个channel来进行了。

需要注意的是,这里是新建的LocalChannel跟原来的那个server bind返回的那个channel不是同一个。这个也是为了同一个server channel只是用来监听,accept后会产生很多的channel,同时服务它们。

LocalChannel的write

对于write,我们就从LocalClient分析,实际上在LocalServer那边的写跟这个也是类似的。

参考代码

同样的,抓了调用栈出来

"localEventLoopGroup-3-1@1373" prio=10 tid=0x18 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at io.netty.channel.local.LocalChannel.doWrite(LocalChannel.java:278)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:750)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:719)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1119)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:735)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:753)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:788)
at com.nettytest.AlexLocalClientHandler.channelActive(AlexLocalClientHandler.java:13)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:212)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:198)
at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:818)
at io.netty.channel.local.LocalChannel$3.run(LocalChannel.java:178)
at io.netty.channel.local.LocalEventLoop.run(LocalEventLoop.java:33)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)


同样的,把焦点放在栈顶的这个函数

public class LocalChannel extends AbstractChannel {
......
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
if (state < 2) {
throw new NotYetConnectedException();
}
if (state > 2) {
throw new ClosedChannelException();
}

final LocalChannel peer = this.peer;
final ChannelPipeline peerPipeline = peer.pipeline();
final EventLoop peerLoop = peer.eventLoop();

if (peerLoop == eventLoop()) {
for (;;) {
Object msg = in.current();
if (msg == null) {
break;
}
peer.inboundBuffer.add(msg);
ReferenceCountUtil.retain(msg);
in.remove();
}
finishPeerRead(peer, peerPipeline);
} else {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
in.remove();
}

peerLoop.execute(new Runnable() {
@Override
public void run() {
Collections.addAll(peer.inboundBuffer, msgsCopy);
finishPeerRead(peer, peerPipeline);
}
});
}
}
.....
}


整个函数很简单,主要就是那个if…else条件。

如果peer跟我当前的使用的是同一个eventloop,那么我就直接放到它的inboundBuffer里面

如果不是同一个eventloop,我就要把要写过去的buffer拷贝一份,通过异步任务的方式放入peerLoop。为什么这里需要拷贝ByteBuf呢,就是因为要放入eventLoop异步执行嘛,原来的ByteBuf很可能在这个异步任务执行前就被释放掉了(GC掉了)。

需要注意的地方

从上面的分析来看,LocalChannel是依赖于这个中央的注册表的,而这个注册表又是以channel的id作为key的,那么这个channel id的唯一性就非常重要,如果local client的数目非常多,那么就有可能发生channel id冲突的情况,导致你的channel 在注册表中注册失败,继而导致connect失败或者是bind失败。

其实netty的作者也注意到了这个问题,给出了解决方案。注意到这个PR的target版本号是5.0.0.Alpha1,而作者另外的一个backporttarget是4.1.0.Beta1。

如果你需要在你的代码中使用LocalChannel,注意一下这两个PR还是有必要。但是如果你真的不太方便升级你的netty版本的话,解决方案就是在connect或者bind的地方加入一定的重试机制,当然需要怎么样的重试还要根据你的业务场景来看啦 :)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  netty