您的位置:首页 > 其它

dubbo源码分析系列——dubbo-rpc-default模块源码分析

2016-05-31 13:58 525 查看

简化类图



从图中可以看出该模块下的类主要是实现了dubbo-rpc-api和dubbo-remoting-api两个模块中定义的一些接口和抽象类。扩展了一种duubo框架自定义的dubbo协议,包括编解码和方法调用处理等。

DubboProtocol

该类是抽象协议实现类AbstractProtocol的具体的dubbo协议的实现,从该类开始着手分析。

发布服务方法export的实现

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// export service. 通过url获得该服务的key。格式如:{serviceGroup}/{serviceName}:{serviceVersion}:{port}
String key = serviceKey(url);
//Dubbo协议实现的服务发布器。
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);

//export an stub service for dispaching event
//参数STUB_EVENT_KEY和IS_CALLBACK_SERVICE的含义不太清楚,需要后续深究。
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}

//调用打开服务器绑定url的方法,这个地方是核心,需要进入深究。
openServer(url);

return exporter;
}

该方法实现了dubbo协议的服务发布,显示构造一个DubboExporter实现类的Exporter,用于返回。最核心的是调用内部方法openServer(url);将该url发布到dubbo服务器上。我们进入该方法看看。

private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}

该方法是获得url的地址,通过地址找到对应的server,若已经有相同的地址则无需构造新的server,只需要直接使用,只就起到了缓存server的作用,避免重复构建server。若已经找到了该地址,则会调用server.reset(url)重置一下。url中的参数Constants.IS_SERVER_KEY参数可以禁止发布远程服务,只能本地调用。具体意义不是十分清楚。继续进入方法:createServer(url)

private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);

url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}

该方法先增加了一些默认的参数,比如heartbeat、server等。检查参数的合法性。最后调用Exchangers.bind(url, requestHandler)将url绑定到requestHandler上获得绑定的服务器,Exchangers是网络通讯模块dubbo-remoting-api中定义的,详细的含义,等我们分析该模块再了解。我们猜测该方法的含义是绑定url的处理器为requestHandler,并返回服务器。requestHandler就是如何处理接收的请求,这个地方是核心,我们进入该对象的定义看看。

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}

@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}

@Override
public void disconnected(Channel channel) throws RemotingException {
if(logger.isInfoEnabled()){
logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}

private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}

private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)){
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};

该对象是一个匿名类对象,实现了接口ExchangeHandler,应该就是一个远程通讯的抽象,是一个通讯处理类,处理接收到信息。其中方法reply是响应客户端请求信息,它根据Invocation对象获得invoker,最后再调用invoker.invoke方法执行目标对象的方法,将返回结果发回给客户端。其它的几个事件的方法也做了响应的处理,包括:received、connected和disconnected等事件。

引用服务方法refer的实现

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}

private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}

ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect){
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}

/**
*获取共享连接
*/
private ExchangeClient getSharedClient(URL url){
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if ( client != null ){
if ( !client.isClosed()){
client.incrementAndGetCount();
return client;
} else {
//                logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client));
referenceClientMap.remove(key);
}
}
ExchangeClient exchagneclient = initClient(url);

client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}

/**
* 创建新连接.
*/
private ExchangeClient initClient(URL url) {

// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

// BIO存在严重性能问题,暂时不允许使用
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}

ExchangeClient client ;
try {
//设置连接应该是lazy的
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
client = new LazyConnectExchangeClient(url ,requestHandler);
} else {
client = Exchangers.connect(url ,requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url
+ "): " + e.getMessage(), e);
}
return client;
}

该方法先直接构造一个DubboInvoker类型的对象,其中获取客户端的参数调用了方法getClients(url)。看是否配置了参数connections,若未配置或配置为0则表示共享客户端连接,如果不共享则直接创建一个新的客户端对象,否则获得已经共享的连接,并且返回一个创建包装器ReferenceCountExchangeClient的客户端实例,该实例会记录被引用次数,最终的方法还是调用目标的client对象。

初始化一个全新的Client对象的方法是核心,它也是先配置一些默认的参数,如果配置参数lazy则表示延迟创建客户端连接,则直接返回一个LazyConnectExchangeClient对象,该对象也是ExchangeClient的包装器对象,它会在请求的时候先检查连接,若未创建连接则会先创建连接。最后调用client = Exchangers.connect(url ,requestHandler);将url绑定到请求处理器requestHandler上。

DubboInvoker

该类是消费者dubbo协议的执行器。它处理了dubbo协议在客户端调用远程接口的逻辑实现。核心方法是doInvoke,我们重点看看这个方法的实现。

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);

ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

首先客户端支持多个连接调用服务,这个可以通过参数设置,会轮询连接去调用服务。支持三种调用方式,分别是oneway(单向调用)、async(异步)和sync(同步),这个都是可以通过url参数指定,通过客户端对应的方法去调用服务端的服务。

next

可以看出来,我们有大量的接口和抽象类来自于dubbo-remoting-api模块,我们的疑问都在这里,接下来我们研究该模块才能解决我们的很多疑惑。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: