您的位置:首页 > 其它

Dubbo服务再暴露

2017-04-18 19:56 176 查看
import java.net.InetAddress;

import java.net.InetSocketAddress;

import java.net.Socket;

import java.net.SocketAddress;

import java.net.UnknownHostException;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.HashMap;

import java.util.HashSet;

import java.util.List;

import java.util.Map;

import java.util.Set;

import java.util.UUID;

import javax.annotation.Resource;

import org.I0Itec.zkclient.ZkClient;

import org.apache.log4j.Logger;

import org.springframework.beans.factory.BeanFactoryUtils;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.ApplicationContext;

import org.springframework.stereotype.Service;

import com.alibaba.dubbo.common.Constants;

import com.alibaba.dubbo.common.URL;

import com.alibaba.dubbo.common.Version;

import com.alibaba.dubbo.common.bytecode.Wrapper;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

import com.alibaba.dubbo.common.utils.ConfigUtils;

import com.alibaba.dubbo.common.utils.NetUtils;

import com.alibaba.dubbo.common.utils.StringUtils;

import com.alibaba.dubbo.config.AbstractServiceConfig;

import com.alibaba.dubbo.config.ApplicationConfig;

import com.alibaba.dubbo.config.ModuleConfig;

import com.alibaba.dubbo.config.MonitorConfig;

import com.alibaba.dubbo.config.ProtocolConfig;

import com.alibaba.dubbo.config.ProviderConfig;

import com.alibaba.dubbo.config.RegistryConfig;

import com.alibaba.dubbo.config.support.Parameter;

import com.alibaba.dubbo.rpc.Exporter;

import com.alibaba.dubbo.rpc.Invoker;

import com.alibaba.dubbo.rpc.Protocol;

import com.alibaba.dubbo.rpc.ProxyFactory;

import com.alibaba.dubbo.rpc.cluster.ConfiguratorFactory;

/**

* 服务状态节点恢复

* 描述: zookeeper分布式管理dubbo服务

* 1、服务正常启动,因外界条件将其相关ip+port端口下的服务删掉

* 2、针对已删除的服务节点再次暴露供消费者调用

* 版权: Copyright (c) 2016

* 作者: kun.tan@

* 版本: 1.0

* 创建日期: 2017年4月14日

* 创建时间: 上午10:26:36

*/

@Service(“serverRecoverService”)

public class ServerRecoverServiceImpl extends AbstractServiceConfig implements ServerRecoverService

{

/**
*
*/
private static final long serialVersionUID = 1L;

Logger logger = Logger.getLogger(ServerRecoverServiceImpl.class);

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

@Resource
private ApplicationContext applicationContext;
protected List<ProtocolConfig> protocols;

@Autowired
private ZkClient zkClient;

@Autowired
private ProviderConfig providerConfig;

private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();

private transient volatile boolean exported;

private transient volatile boolean unexported;

@Parameter(excluded = true)
public boolean isExported()
{
return exported;
}

@Parameter(excluded = true)
public boolean isUnexported()
{
return unexported;
}

public ProviderConfig getProviderConfigConfig()
{
return providerConfig;
}

public void setProviderConfig(ProviderConfig providerConfig)
{
this.providerConfig = providerConfig;
}

/**
* @param jsonServerNodes 即将暴露的服务节点
* eg:["com.***.scm.baseservice.sac.manager.test.CodeTempManager","com.***.scm.baseservice.sac.manager.PkManager","com.***.scm.baseservice.sac.service.CodeService","com.***.scm.baseservice.sac.manager.monitor.CodeMonitorManager","com.***.scm.baseservice.sac.manager.task.CommonTaskManager","com.***.scm.baseservice.sac.manager.CodeManager","com.***.scm.baseservice.sac.service.PkService","com.***.scm.baseservice.sac.manager.status.ServerRecoverService"]
*/
public void serverNodeRecover(String jsonServerNodes)
{
init();
//获取dubbo服务暴露的协议
protocols = getProtocols();
List<URL> registryURLs = loadRegistries(true);
Set<String> serviceNodeSet = JsonUtil.readValue(jsonServerNodes, Set.class);//相关的服务节点
for (String node : serviceNodeSet)
{
for (ProtocolConfig protocolConfig : protocols)
{
exportNode(node, registryURLs, protocolConfig);
}
}
}

/**
* 服务节点暴露
* @param interfaceName 服务节点
* @param registryURLs 注册地址
* @param protocolConfig 协议
* 2017年4月18日 下午2:51:17
*/
@SuppressWarnings({ "unchecked", "rawtypes", "unused" })
public void exportNode(String interfaceName, List<URL> registryURLs, ProtocolConfig protocolConfig)
{
Class<?> interfaceClass = null;
// 接口实现类引用
Object ref = null;
String name = protocolConfig.getName();
if (name == null || name.length() == 0)
{
name = "dubbo";
}
String host = protocolConfig.getHost();
if (providerConfig != null && (host == null || host.length() == 0))
{
host = providerConfig.getHost();
}
boolean anyhost = false;
if (NetUtils.isInvalidLocalHost(host))
{
anyhost = true;
try
{
host = InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e)
{
logger.warn(e.getMessage(), e);
}
if (NetUtils.isInvalidLocalHost(host))
{
if (registryURLs != null && registryURLs.size() > 0)
{
for (URL registryURL : registryURLs)
{
try
{
Socket socket = new Socket();
try
{
SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
socket.connect(addr, 1000);
host = socket.getLocalAddress().getHostAddress();
break;
}
finally
{
try
{
socket.close();
}
catch (Throwable e)
{
}
}
}
catch (Exception e)
{
logger.warn(e.getMessage(), e);
}
}
}
if (NetUtils.isInvalidLocalHost(host))
{
host = NetUtils.getLocalHost();
}
}
}
Integer port = protocolConfig.getPort();
if (providerConfig != null && (port == null || port == 0))
{
port = providerConfig.getPort();
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.ANYHOST_KEY, "true");
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0)
{
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
//接口对象
interfaceClass = getInterfaceClass(interfaceName);
String revision = Version.getVersion(interfaceClass, getValidation());
if (revision != null && revision.length() > 0)
{
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0)
{
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else
{
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
try
{
//引用对象
ref = applicationContext.getBean(interfaceClass);
}
catch (SecurityException e)
{
throw e;
}
if (!ConfigUtils.isEmpty(token))
{
if (ConfigUtils.isDefault(token))
{
map.put("token", UUID.randomUUID().toString());
}
else
{
map.put("token", token);
}
}
if ("injvm".equals(protocolConfig.getName()))
{
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// 导出服务
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && providerConfig != null)
{
contextPath = providerConfig.getContextpath();
}
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + interfaceName, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol()))
{
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
//配置为none不暴露
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope))
{
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope))
{
exportLocal(url, ref, interfaceClass);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope))
{
if (logger.isInfoEnabled())
{
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true))
{
for (URL registryURL : registryURLs)
{
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null)
{
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled())
{
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
else
{
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private void exportLocal(URL url, Object ref, Class interfaceClass)
{
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol()))
{
URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL).setHost(NetUtils.LOCALHOST).setPort(0);
Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}

/**
* 实例化对象
* @param interfaceName
* @return
* 2017年4月18日 下午2:42:11
*/
public Class<?> getInterfaceClass(String interfaceName)
{
Class<?> interfaceClass = null;
try
{
if (interfaceName != null && interfaceName.length() > 0)
{
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
}
}
catch (ClassNotFoundException t)
{
throw new IllegalStateException(t.getMessage(), t);
}
return interfaceClass;
}

public synchronized void unexport()
{
if (!exported)
{
return;
}
if (unexported)
{
return;
}
if (exporters != null && exporters.size() > 0)
{
for (Exporter<?> exporter : exporters)
{
try
{
exporter.unexport();
}
catch (Throwable t)
{
logger.warn("unexpected err when unexport" + exporter, t);
}
}
exporters.clear();
}
unexported = true;
}

/**
* 再次初始化dubbo服务相关协议请求等信息
*
* 2017年4月18日 下午3:40:28
*/
public void init()
{
if (getApplication() == null && (getProviderConfigConfig() == null || getProviderConfigConfig().getApplication() == null))
{
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class,
false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0)
{
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values())
{
if (config.isDefault() == null || config.isDefault().booleanValue())
{
if (applicationConfig != null)
{
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
}
if (applicationConfig != null)
{
setApplication(applicationConfig);
}
}
}

if (getModule() == null && (getProviderConfigConfig() == null || getProviderConfigConfig().getModule() == null))
{
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0)
{
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values())
{
if (config.isDefault() == null || config.isDefault().booleanValue())
{
if (moduleConfig != null)
{
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null)
{
setModule(moduleConfig);
}
}
}
if ((getRegistries() == null || getRegistries().size() == 0)
&& (getProviderConfigConfig() == null || getProviderConfigConfig().getRegistries() == null || getProviderConfigConfig().getRegistries().size() == 0)
&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0))
{
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (registryConfigMap != null && registryConfigMap.size() > 0)
{
List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
for (RegistryConfig config : registryConfigMap.values())
{
if (config.isDefault() == null || config.isDefault().booleanValue())
{
registryConfigs.add(config);
}
}
if (registryConfigs != null && registryConfigs.size() > 0)
{
super.setRegistries(registryConfigs);
}
}
}

if (getMonitor() == null && (getProviderConfigConfig() == null || getProviderConfigConfig().getMonitor() == null) && (getApplication() == null || getApplication().getMonitor() == null))
{
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0)
{
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values())
{
if (config.isDefault() == null || config.isDefault().booleanValue())
{
if (monitorConfig != null)
{
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null)
{
setMonitor(monitorConfig);
}
}
}
// 获取协议
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (protocolConfigMap != null && protocolConfigMap.size() > 0)
{
List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
for (ProtocolConfig config : protocolConfigMap.values())
{
if (config.isDefault() == null || config.isDefault().booleanValue())
{
protocolConfigs.add(config);
}
}
if (protocolConfigs != null && protocolConfigs.size() > 0)
{
super.setProtocols(protocolConfigs);
}
}
}


}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: