您的位置:首页 > 编程语言 > Java开发

dubbo 源码学习笔记 (八) —— 远程通讯模块

2017-10-14 17:00 375 查看
欢迎访问我的个人博客休息的风

dubbo的远程通讯模块的介绍,将从两条主线分别讲述。首先是从Exchanges->Transports->Netty打开通道,建立连接;接着是MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->ExchangeHandlerAdapter这样一条handler处理线;

如下图,是整个模块的类图(看不清可在新页签查看)



首先来看Exchanges类,这个类是通讯的入口,在DubboProtocol.createServer和initClient方法里分别用到Exchanges的bind和connect方法。

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//这里获取的Exchanger是HeaderExchanger
return getExchanger(url).bind(url, handler);
}

在HeaderExchanger里,bind会去实例一个HeaderExchangeServer对象。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

在这之前,会调用Transporters去bind.

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//使用NettyTransporter去bind
return getTransporter().bind(url, handler);
}

在NettyTransporter里,会去进行netty创建服务端server

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}


NettyServer会开启请求线程和处理线程,并绑定地址端口,开启服务。

protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
//两个线程池,用于请求和工作处理
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.
4000
newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
//使用适配,用DubboCodec去编码解码
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
//工作线程的处理handler
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}

这样,服务端的服务就启动了,与之相对的,是客户端与服务端建立连接。

客户端Exchanges和Transports与服务端大致相同,主要是创建NettyClient。

protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
//使用适配,用DubboCodec去编码解码
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
//工作线程的处理handler
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}

这里会与服务端的netty建立连接。这样就把客户端和服务端联系起来,并建立通道了。Exchanges->Transports->Netty打开通道,建立连接这一条线基本介绍完了。接下来我们来看下handler这条线。

在DubboProtocol里,会去创建ExchangeHandlerAdapter,并赋值给requestHandler。这个是最里层处理调用的地方。

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//从DubboExporter里获取Invoker
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
//省略代码 。。。。
}

之后,在HeaderExchanger,会对其进行包装,不论是客户端还是服务端

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
//对handler进行HeaderExchangeHandler和DecodeHandler包装
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//对handler进行HeaderExchangeHandler和DecodeHandler包装
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

之后,在创建NettyServer和NettyClient进,会再进行一层包装

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}

这里会使用ChannelHandlers.wrap进行包装

public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}

加上Dispatcher、HeartbeatHandler、MultiMessageHandler的包装。这样就把整个handler串起来了。

这篇博客主要介绍使用netty做为通讯,这也是最经典最通用的通讯方式。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息