您的位置:首页 > 其它

深入dubbo内核(3):本地服务暴露

2018-02-12 11:22 381 查看
服务发布原理

观察dubbo服务发布时的启动日志

[DUBBO] The service ready on spring started. service: com.alibaba.dubbo.demo.DemoService, dubbo version: 2.0.0, current host: 127.0.0.1
// 第一步:暴露本地服务
[DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 127.0.0.1
// 第二步:暴露远程服务
[DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to url.........
[DUBBO] Register dubbo service com.alibaba.dubbo.demo.DemoService url..........
// 第三步:启动netty
INFO transport.AbstractServer: [DUBBO] Start NettyServer bind /0.0.0.0:20880, export /10.0.74.181:20880, dubbo version: 2.0.0, current host: 127.0.0.1
INFO zookeeper.ZookeeperRegistry:  [DUBBO] Load registry store file....
// 第四步:打开注册中心
main-SendThread(10.0.74.174:2181)  INFO zookeeper.ClientCnxn: Opening socket connection to server 10.0.74.174/10.0.74.174:2181.
// 第五步:服务注册到注册中心
[DUBBO] Register: dubbo://10.0.74.181:20880/com.alibaba.dubbo.demo.DemoService?........
// 第六步:监听注册中心
[DUBBO] Subscribe: provider://10.0.74.181:20880/com.alibaba.dubbo.demo.DemoService?..........
[DUBBO] Notify urls for subscribe url provider://10.0.74.181:20880/com.alibaba.dubbo.demo.DemoService?.........

观察以上日志,可以说dubbo的服务发布主要就是围绕以上六个步骤。而服务的的发布实际上主要就是针对一行代码:

<!-- 声明需要暴露的服务接口 -->
<dubbo:service  interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>


spring自定义标签
完成一个spring的自定义配置一般需要以下5个步骤:
1.编写XSD文件 全称就是 XML Schema 它就是校验XML,定义了一些列的语法来规范XML
2.编写NamespaceHandler和BeanDefinitionParser完成解析工作
3.编写spring.handlers和spring.schemas串联起所有部件

动态注册bean

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}

DubboNamespaceHandler是dubbo所有spring标签的入口,而ServiceBean则是服发布的入口,那ServiceBean是如何被spring容器管理的。

public class DubboBeanDefinitionParser implements BeanDefinitionParser {
public DubboBeanDefinitionParser(Class<?> beanClass, boolean required) {
this.beanClass = beanClass;
this.required = required;
}
private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
String id = element.getAttribute("id");
if ((id == null || id.length() == 0) && required) {
String generatedBeanName = element.getAttribute("name");
if (generatedBeanName == null || generatedBeanName.length() == 0<
4000
/span>) {
if (ProtocolConfig.class.equals(beanClass)) {
generatedBeanName = "dubbo";
} else {
generatedBeanName = element.getAttribute("interface");
}
}
if (generatedBeanName == null || generatedBeanName.length() == 0) {
generatedBeanName = beanClass.getName();
}
id = generatedBeanName;
int counter = 2;
while (parserContext.getRegistry().containsBeanDefinition(id)) {
id = generatedBeanName + (counter++);
}
}
if (id != null && id.length() > 0) {
if (parserContext.getRegistry().containsBeanDefinition(id)) {
throw new IllegalStateException("Duplicate spring bean id " + id);
}
// 注册Bean到spring容器
parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
beanDefinition.getPropertyValues().addPropertyValue("id", id);
....
}
public BeanDefinition parse(Element element, ParserContext parserContext) {
return parse(element, parserContext, beanClass, required);
}
}


ServiceBean

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
private static final long serialVersionUID = 213195494150089726L;
private static transient ApplicationContext SPRING_CONTEXT;
private transient ApplicationContext applicationContext;
private transient String beanName;
private transient boolean supportedApplicationListener;
public ServiceBean() {
super();
}
public ServiceBean(Service service) {
super(service);
}
public static ApplicationContext getSpringContext() {
return SPRING_CONTEXT;
}
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
SpringExtensionFactory.addApplicationContext(applicationContext);
if (applicationContext != null) {
SPRING_CONTEXT = applicationContext;
try {
Method method = applicationContext.getClass().getMethod("addApplicationListener", new Class<?>[]{ApplicationListener.class}); // 兼容Spring2.0.1
method.invoke(applicationContext, new Object[]{this});
supportedApplicationListener = true;
} catch (Throwable t) {
if (applicationContext instanceof AbstractApplicationContext) {
try {
Method method = AbstractApplicationContext.class.getDeclaredMethod("addListener", new Class<?>[]{ApplicationListener.class}); // 兼容Spring2.0.1
if (!method.isAccessible()) {
method.setAccessible(true);
}
method.invoke(applicationContext, new Object[]{this});
supportedApplicationListener = true;
} catch (Throwable t2) {
}
}
}
}
}
public void setBeanName(String name) {
this.beanName = name;
}
public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
//发布服务
export();
}
}
}

ServiceBean实现了InitializingBean,DisposableBean,ApplicationContextAware,ApplicationListener,BeanNameAware五个接口。ApplicationListener是spring的事件监听,当beans加载时,ApplicationContext发布某些类型的事件,
例如,当上下文启动时,ContextStartedEvent发布,当上下文停止时,ContextStoppedEvent发布。通过ApplicationEvent 类和 ApplicationListener 接口来提供在 ApplicationContext 中处理事件。如果一个bean实现 ApplicationListener,那么每次 ApplicationEvent被发布到ApplicationContext上,那个bean会被通知。(ContextRefreshedEvent,当ApplicationContext被初始化或刷新时,该事件被发布)。

服务暴露
服务暴露的过程是一个非常复杂的过程,在这里由于能力有限只能分享一下几个主要的方法。
ServiceConfig

服务暴露又分为本地暴露和远程暴露:
1.本地暴露:指暴露在同一个JVM中,不用通过注册中心来做远程暴露
2.远程暴露:指暴露给远程客户端IP和端口号,通过远程通讯

-->export()
-->doExport();
-->doExportUrls();// 获取服务暴露使用的协议 默认使用tcp协议
-->loadRegistries(true);// 组装注册中心url
-->doExportUrlsFor1Protocol()// 服务暴露
-->URL url = new URL(...)// 组装服务url
-->exportLocal(url);// 暴露本地服务

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private void exportLocal(URL url) {
// 将url组装为本地暴露的方式
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(NetUtils.LOCALHOST)
.setPort(0);
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);


serviceBean调用的时候会首先调用父类构造初始化ProxyFactory,ProxyFactory的作用就是为了获取一个远程接口的代理他有两个方法:
1.getInvoker(T proxy, Class type, URL url):针对service端,将服务对象,例如DemoServiceImpl包装为一个Invoker对象
2.getProxy(Invoker invoker):针对客户端,创界接口代理对象,例如DemoService接口

@SPI("javassist")
public interface ProxyFactory {
/**
* create proxy.
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create invoker.
*
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}

–通过spi获取ProxyFactory$Adaptive代理类

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
// 默认javassist
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString
cb44
() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
// 调用Invoker方法
return extension.getInvoker(arg0, arg1, arg2);
}
}

– JavassistProxyFactory

/**
* JavaassistRpcProxyFactory
*
* @author william.liangf
*/
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
/**
* @param proxy 接口实现类引用
* @param type  接口
* @param url   暴露服务的url
* @param <T>
* @return  Invoker
*/
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper类不能正确处理带$的类名</span><br>        <span class="hljs-keyword" style="font-size: inherit; line-height: inherit; margin: 0px; padding: 0px; color: rgb(248, 35, 117); word-wrap: inherit !important; word-break: inherit !important; white-space: inherit !important;">final</span> Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(<span class="hljs-string" style="font-size: inherit; line-height: inherit; margin: 0px; padding: 0px; color: rgb(238, 220, 112); word-wrap: inherit !important; word-break: inherit !important; white-space: inherit !important;">'$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}


– Wrapper:类似于BeanWapper,包装了一个接口或者一个类,可以通过wapper对一个对象进行赋值取值和指定方法的调用。
– Invoker:一个可执行对象,能根据方法的名称,参数得到相应的执行结果。最终得到一个AbstractProxyInvoker。

-->protocol.export
-->Protocol$Adaptive.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("injvm");
-->extension.export(arg0);
-->ProtocolListenerWrapper.export(Invoker<T> invoker)
-->protocol.export(
-->buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
-->List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), "service.filter", "provider");//创建8个filter(调用链)
-->InjvmProtocol.export
-->new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
--> return  new ListenerExporterWrapper<T>()

class InjvmExporter<T> extends AbstractExporter<T> {
private final String key;
private final Map<String, Exporter<?>> exporterMap;
/**
* @param invoker
* @param key  暴露的服务(接口)
* @param exporterMap value : Invoker对象和filter
*/
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
exporterMap.put(key, this);
}
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
}

本地暴露的最终目的就是将要暴露的服务和服务Invoker对象存储到exporterMap中。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: