搞懂Dubbo服务发布与服务注册
一.前言
本文讲服务发布与服务注册,服务提供者本地发布服务,然后向注册中心注册服务,将服务实现类以服务接口的形式提供出去,以便服务消费者从注册中心查阅并调用服务。
本文源码分析基于org.apache.dubbo:dubbo:2.7.2,服务端代码例子是上文的例子
如果没有Dubbo SPI的基础知识,建议先看Dubbo SPI,否则源码怎么跳转的将毫无头绪
Dubbo SPI:https://www.geek-share.com/detail/2770652300.html
二.服务发布
调用顺序
首先讲一下大致的服务发布的调用顺序图,蓝色方法不分析,主要是起一个netty服务
-org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent
-org.apache.dubbo.config.ServiceConfig#export
-org.apache.dubbo.config.ServiceConfig#doExport
-org.apache.dubbo.config.ServiceConfig#doExportUrls
-org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
-org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
-org.apache.dubbo.registry.integration.RegistryProtocol#export
-org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
-org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
-org.apache.dubbo.remoting.exchange.Exchangers#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler)
-org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
-org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
-org.apache.dubbo.remoting.transport.netty.NettyTransporter#bind
...
源码分析
1.org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent,这个方法就不多说了,注释写的很详细了
/** * ServiceBean实现了ApplicationListener接口 * 在IOC的容器的启动过程,当所有的bean都已经处理完成之后,spring ioc容器会发布ContextRefreshedEvent事件。 * 此处就是接收到事件处理的逻辑,开始服务发布之旅 * * @param event */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { // 是否已发布 && 是否已经被取消发布 if (!isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 发布 export(); } }
2.org.apache.dubbo.config.ServiceConfig#export,这里的checkAndUpdateSubConfigs主要做的就是检测标签合法,检测各种对象是否为空,为空则创建。之后判断是否可以发布和是否需要延迟发布,需要则延迟再doExport
public synchronized void export() { // 检测<dubbo:service>的interface是否合法 // 检查provider为空 // 检查各种对象是否为空,为空则创建 checkAndUpdateSubConfigs(); if (!shouldExport()) { return; } // 是否延迟? if (shouldDelay()) { // 延迟 DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { doExport(); } }
3.org.apache.dubbo.config.ServiceConfig#doExport,注释写的很清楚,还是没有走到核心逻辑
protected synchronized void doExport() { // 是否已经被取消发布 if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } // 是否已经发布,注意这个变量volatile修饰 if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); }
4.org.apache.dubbo.config.ServiceConfig#doExportUrls,先加载要注册的url,然后遍历所有协议,发布服务并注册
@SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { // 加载要注册的url // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&pid=10516®istry=zookeeper×tamp=1559889491339 List<URL> registryURLs = loadRegistries(true); // for循环每个协议,发布服务并注册到注册中心 for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); ApplicationModel.initProviderModel(pathKey, providerModel); doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
5.org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol,看方法名字也知道,单一协议多导出服务。这个方法做的事情比较多,我这边拆成4个部分,第一部分、第二部分和第三部分都是在做填充map的事情,第三部分最后生成导出url。第四部分开始发布服务,里面会判断到底是发布服务并且注册到注册中心呢还是仅发布服务。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { /** 第一部分开始 **/ String name = protocolConfig.getName(); // 为空或者为空字符串,默认dubbo协议 if (StringUtils.isEmpty(name)) { name = DUBBO; } Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, PROVIDER_SIDE); appendRuntimeParameters(map); appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); // remove 'default.' prefix for configs from ProviderConfig // appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, provider); appendParameters(map, protocolConfig); appendParameters(map, this); // 上面的代码就是将版本,方法,各种配置放到map里去 // 这里给出debug的时候的map对象 // map.toString() = {side=provider, application=dubbo-server, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368} /** 第一部分结束 **/ /** 第二部分开始 **/ // 这段if里做的事情主要是检测<dubbo:method> 标签中的配置信息,填充map if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { // 添加MethodConfig到map中,key=方法名.属性 value=属性值 // ex sayHello.retries:2 appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } // 获取ArgumentConfig列表 List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { for (ArgumentConfig argument : arguments) { // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { // 添加ArgumentConfig信息到map中 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } /** 第二部分结束 **/ /** 第三部分开始 **/ if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } // 生成包装类 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 添加方法到map中 if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // 添加token if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } // export service // 此处map.toString() = {side=provider, application=dubbo-server, methods=hello, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368} String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // 此处url=dubbo://192.168.5.16:20880/com.grimmjx.edu.HelloService?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=com.grimmjx.edu.HelloService&methods=hello&pid=11917&side=provider&timeout=100×tamp=1559973693109 URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } /** 第三部分结束 **/ /** 第四部分开始 **/ // 开始发布服务 String scope = url.getParameter(SCOPE_KEY); // don't export when none is configured // //配置为none不暴露 if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) // 配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) // 如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务) if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (!isOnlyInJvm() && logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (CollectionUtils.isNotEmpty(registryURLs)) { // 发布服务 for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // 生成Invoker // Invoker是十分重要的对象,可向它发起invoke调用 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); // 持有this和invoker // 此处的invoker.getUrl()=registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D10738%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559895851366&pid=10738®istry=zookeeper×tamp=1559895851283 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 发布,并生成Exporter Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } // 没有注册中心,仅发布服务 } else { Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); /** 第四部分结束 **/ }
6.org.apache.dubbo.registry.integration.RegistryProtocol#export,为什么是RegistryProtocol?注释里写的很清楚了,这里的url是registry://开头的。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 获取注册地址 URL registryUrl = getRegistryUrl(originInvoker); // url to export locally // 获取provider url URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 要注册的url providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker // 导出服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry
.... }
之后上面调用顺序图的蓝色部分的代码不做分析,主要是创建一个NettyServer(默认)。可自行研究
三.服务注册
调用顺序
首先讲一下大致的服务注册的调用顺序图,我们只分析红色部分。
-org.apache.dubbo.registry.integration.RegistryProtocol#register
-org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
-org.apache.dubbo.registry.support.FailbackRegistry#register
-org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
源码分析
1.org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry,具体还要看下一步
@Override public Registry getRegistry(URL url) { url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = url.toServiceStringWithoutResolving(); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { // 缓存中获取 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //create registry by spi/ioc // 用Dubbo SPI创建Registry registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } // 写入缓存 REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } }
2.org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry,创建一个ZooKeeperRegistry实例并返回
@Override public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }
3.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry,主要做的就是利用zk创建Zk客户端
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(state -> { if (state == StateListener.RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } }); }
4.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister,连接好zk后,是不是就要创建服务提供者的节点了?所以这一步就是注册服务
@Override public void doRegister(URL url) { try { // toUrlPath(url) = /dubbo/com.grimmjx.edu.HelloService/providers/dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D11917%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559973693109 zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
这一步之后,我们用zkCli来连接zk看一下节点数据
- 用zookeeper作为Dubbo的注册中心发布服务
- dubbo服务启动注册报 java.net.UnknownHostException 未知的名称或服务解决办法
- 商城项目-dubbo,框架整合,dubbo发布和引用服务
- [ZooKeeper.net] 1 模仿dubbo实现一个简要的http服务的注册 基于webapi
- dubbo服务只注册不订阅
- dubbo的服务注册发现是怎么实现的.
- Dubbo中多注册中心问题与服务分组
- maven中dubbo的发布服务与使用服务
- Spring Boot + Dubbo 可运行的例子源码-实现服务注册和远程调用
- 外网ip注册dubbo服务
- 8. Dubbo原理解析-服务发布
- 淘宝SOA框架dubbo学习(2)--搭建Zookeeper注册中心服务
- [ZooKeeper.net] 1 模仿dubbo实现一个简要的http服务的注册 基于webapi
- NET(C#)接入Dubbo服务,Zookeeper作为Dubbo服务的注册中心,实现thrift协议访问接口(3)
- 被事务代理的spring service 不能使用注解方式发布dubbo服务
- 使用dubbo+zookeeper发布服务项目实践
- 学习淘淘商城第十二课(发布dubbo服务)
- Docker dubbo 服务注册
- 一台服务器发布多个tomcat并注册服务名办法
- Dubbo_异常_服务注册运行正常但是Dubbo-Admin看不到服务(亲测可用)