您的位置:首页 > Web前端

5.[dubbo源码解析]-[配置][详解]org.apche.dubbo.config.ReferenceConfig-服务消费者引用服务配置类

2018-11-23 17:24 405 查看

1.ReferenceConfig—结构图

2.消费者api应用示例

@From:《dubbo文档-配置-API配置

3.ReferenceConfig-属性

private static final long serialVersionUID = -5864351140409987595L;
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private final List<URL> urls = new ArrayList<URL>();
// interface name
/**
* 接口名称
*/
private String interfaceName;
/**
* 接口类
*/
private Class<?> interfaceClass;
/**
*异步接口类
*/
private Class<?> asyncInterfaceClass;
// client type
/**
* 客户端类型
*/
private String client;
// url for peer-to-peer invocation
/**
* 点对点直连服务提供地址
*/
private String url;
// method configs
/**
* 方法配置
*/
private List<MethodConfig> methods;
// default config
/**
* 缺省consumer参数
*/
private ConsumerConfig consumer;
/**
* 协议类型
*/
private String protocol;
// interface proxy reference

private transient volatile T ref;
private transient volatile Invoker<?> invoker;
private transient volatile boolean initialized;
private transient volatile boolean destroyed;

4.ReferenceConfig#get()入口

/**
*  消费者应用入口
*/
public synchronized T get() {
// 已销毁,不可获得 @sjt 1
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
//进行初始化操作 @sjt 2
init();
}
return ref; // @sjt 3
}
  • @sjt1: 若已经销毁( destroyed = true ),抛出异常。
  • @sjt2: 若未初始化,调用 #init() 方法,进行初始化。
  • @sjt3: 返回引用服务。

<fontcolor=#336699 id=“ReferenceConfig”>5.ReferenceConfig#init()

5.1.检查必填项和预配置项

//检测是否初始化 如果初始化直接返回 没有初始化将flag标识:initialized设置为true;
if (initialized) {
return;
}
initialized = true;
//检测接口名必填问题 接口名为null抛异常
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
// get consumer's global configuration
// 拼接属性配置(环境变量 + properties 属性)到 ConsumerConfig 对象
checkDefault();
// 拼接属性配置(环境变量 + properties 属性)到 ReferenceConfig 对象
appendProperties(this);
//若未设置 `generic` 属性,使用 `ConsumerConfig.generic` 属性。
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
// 实现泛化接口
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
// 普通接口的实现
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 校验接口和方法
checkInterfaceAndMethods(interfaceClass, methods);
}

5.2.直连提供者设置

@From:《dubbo文档-示例-直连提供者

//直连提供者 @From:[dubbo文档-示例-直连提供者](http://dubbo.apache.org/zh-cn/docs/user/demos/explicit-target.html)
//【第一优先级】在JVM 启动参数中加入-D参数映射服务地址 eg:java -Dcom.alibaba.xxx.XxxService=dubbo://localhost:20890
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
//【第二优先级】通过文件映射,例如 com.alibaba.xxx.XxxService=dubbo://localhost:20890
if (resolve == null || resolve.length() == 0) {
// 默认先加载,`${user.home}/dubbo-resolve.properties` 文件 ,无需配置
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
resolveFile = userResolveFile.getAbsolutePath();
}
}
// 存在 resolveFile ,则进行文件读取加载。
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
properties.load(fis);
} catch (IOException e) {
throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
} finally {
try {
if (null != fis) {
fis.close();
}
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
resolve = properties.getProperty(interfaceName);
}
}
//设置直连提供者的url
if (resolve != null && resolve.length() > 0) {
url = resolve;
if (logger.isWarnEnabled()) {
//会打印警告日志 不会抛异常
if (resolveFile != null) {
logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
} else {
logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
}
}
}

5.3.填充配置项,创建消费者实例

// 从 ConsumerConfig 对象中,读取 application、module、registries、monitor 配置对象。
if (consumer != null) {
if (application == null) {
application = consumer.getApplication();
}
if (module == null) {
module = consumer.getModule();
}
if (registries == null) {
registries = consumer.getRegistries();
}
if (monitor == null) {
monitor = consumer.getMonitor();
}
}
// 从 ModuleConfig 对象中,读取 registries、monitor 配置对象。
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
// 从 ApplicationConfig 对象中,读取 registries、monitor 配置对象。
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
// 校验 ApplicationConfig 配置。
checkApplication();
// 校验 Stub 和 Mock 相关的配置
checkStubAndMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
resolveAsyncInterface(interfaceClass, map);
// 将 `side`,`dubbo`,`timestamp`,`pid` 参数,添加到 `map` 集合中。
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}

String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
// 将各种配置对象,添加到 `map` 集合中。
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
// 获得服务键,作为前缀
String prefix = StringUtils.getServiceKey(map);
// 将 MethodConfig 对象数组,添加到 `map` 集合中。
if (methods != null && !methods.isEmpty()) {
Map<Object, Object> attributes = new HashMap<Object, Object>();
for (MethodConfig method : methods) {
// 将 MethodConfig 对象,添加到 `map` 集合中。
appendParameters(map, method, method.getName());
// 当 配置了 `MethodConfig.retry = false` 时,强制禁用重试
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
//将带有 @Parameter(attribute = true) 配置对象的属性,添加到参数集合。用于【事件通知】
appendAttributes(attributes, method, prefix + "." + method.getName());
// 检查属性集合中的事件通知方法是否正确。若正确,进行转换。
checkAndConvertImplicitConfig(method, map, attributes);
}
//attributes are stored by system context.
// 添加到 StaticContext 进行缓存
StaticContext.getSystemContext().putAll(attributes);
}
// 以系统环境变量( DUBBO_IP_TO_REGISTRY ) 作为服务注册地址 获取注册ip
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
//根据上下文创建服务实例
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

6.ReferenceConfig#createProxy()

6.1.判断injvm 是否从当前JVM中查找引用的实例

URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference 指定URL的情况下,不做本地引用
isJvmRefer = false;
} else {
// by default, reference local service if there is
//默认情况下如果本地有服务暴露,则引用本地服务.
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else {
isJvmRefer = isInjvm();
}

6.2.获取合适的url,填充到Infoker中

//从本地用用服务
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
//从远程获取服务
} else {
if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration 通过注册中心配置拼装URL
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}

if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url 使用最后一个registry url
}
}
if (registryURL != null) { // registry url is available 如果注册url不为空
// use AvailableCluster only when register's cluster is available
// 对有注册中心的Cluster只用 有效的cluster url
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url  不是注册的url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: