您的位置:首页 > 其它

Curator教程(三)服务注册&发现(Service Discovery)

2016-12-10 16:10 639 查看

服务发现

随着微服务的日趋火爆,系统拆分之后,各个服务模块的Provider和Consumer之间需要能够查找到对方,我们称之为服务发现。

In SOA/distributed systems, services need to find each other. i.e. a web service might need to find a caching service, etc. DNS can be used for this but it is nowhere near flexible enough for services that are constantly changing. A Service Discovery system provides a mechanism for:

Services to register their availability

Locating a single instance of a particular service

Notifying when the instances of a service change

Curator Service Discovery

Curator Service Discovery就是为了解决这个问题而生的,它对此抽象出了ServiceInstance、ServiceProvider、ServiceDiscovery三个接口,通过它我们可以很轻易的实现Service Discovery。

实践

假如我们需要对外提供一个服务(com.bytebeats.service.HelloService),为了保证服务的高可用行 HelloService服务部署在N台机器上,现在Consumer想要请求这个服务,如何保证HelloService可以灵活部署(随意 增加/删除 机器)?

思路

首先,HelloService Provider在启动时向Zookeeper 注册本机提供的 服务名称端口号地址;Consumer启动的时候先查询Zookeeper获取到服务Provider列表,然后通过负载均衡算法(随机、RoundRobin、一致性Hash) 选择一台机器去调用服务。

代码实现

Maven依赖

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>2.11.1</version>
</dependency>


ServerApp.java

package com.bytebeats.zookeeper.curator.discovery;

import com.bytebeats.zookeeper.curator.CuratorUtils;
import com.bytebeats.zookeeper.curator.discovery.domain.ServerPayload;
import com.bytebeats.zookeeper.util.JsonUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;

import java.util.Collection;
import java.util.concurrent.TimeUnit;

/**
* 模拟服务提供者
*
* @author Ricky Fung
* @create 2016-12-08 19:24
*/
public class ServerApp {

public static final String BASE_PATH = "services";
public static final String SERVICE_NAME = "com.bytebeats.service.HelloService";

public static void main(String[] args) {

CuratorFramework client = null;
ServiceRegistry serviceRegistry = null;
try{
client = CuratorUtils.getCuratorClient();
client.start();

serviceRegistry = new ServiceRegistry(client, BASE_PATH);
serviceRegistry.start();

//注册两个service 实例
ServiceInstance<ServerPayload> host1 = ServiceInstance.<ServerPayload>builder()
.id("host1")
.name(SERVICE_NAME)
.port(21888)
.address("10.99.10.1")
.payload(new ServerPayload("HZ", 5))
.uriSpec(new UriSpec("{scheme}://{address}:{port}"))
.build();

serviceRegistry.registerService(host1);

ServiceInstance<ServerPayload> host2 = ServiceInstance.<ServerPayload>builder()
.id("host2")
.name(SERVICE_NAME)
.port(21888)
.address("10.99.1.100")
.payload(new ServerPayload("QD", 3))
.uriSpec(new UriSpec("{scheme}://{address}:{port}"))
.build();

serviceRegistry.registerService(host2);

System.out.println("register service success...");

TimeUnit.MINUTES.sleep(1);

Collection<ServiceInstance<ServerPayload>> list = serviceRegistry.queryForInstances(SERVICE_NAME);
if(list!=null && list.size()>0){
System.out.println("service:"+SERVICE_NAME+" provider list:"+ JsonUtils.toJson(list));
} else {
System.out.println("service:"+SERVICE_NAME+" provider is empty...");
}

} catch (Exception e) {
e.printStackTrace();
} finally {
if(serviceRegistry!=null){
try {
serviceRegistry.close();
} catch (Exception e) {
e.printStackTrace();
}
}
client.close();
}

}
}


ServiceRegistry.java

package com.bytebeats.zookeeper.curator.discovery;

import com.bytebeats.zookeeper.curator.discovery.domain.ServerPayload;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.util.Collection;

/**
* 服务注册
*
* @author Ricky Fung
* @create 2016-12-08 19:16
*/
public class ServiceRegistry {

private ServiceDiscovery<ServerPayload> serviceDiscovery;

private final CuratorFramework client;

public ServiceRegistry(CuratorFramework client, String basePath){
this.client = client;
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerPayload.class)
.client(client)
.serializer(new JsonInstanceSerializer<>(ServerPayload.class))
.basePath(basePath)
.build();
}

public void updateService(ServiceInstance<ServerPayload> instance) throws Exception {
serviceDiscovery.updateService(instance);
}

public void registerService(ServiceInstance<ServerPayload> instance) throws Exception {
serviceDiscovery.registerService(instance);
}

public void unregisterService(ServiceInstance<ServerPayload> instance) throws Exception {
serviceDiscovery.unregisterService(instance);
}

public Collection<ServiceInstance<ServerPayload>> queryForInstances(String name) throws Exception {
return serviceDiscovery.queryForInstances(name);
}

public ServiceInstance<ServerPayload> queryForInstance(String name, String id) throws Exception {
return serviceDiscovery.queryForInstance(name, id);
}

public void start() throws Exception {
serviceDiscovery.start();
}

public void close() throws Exception {
serviceDiscovery.close();
}
}


Consumer

package com.bytebeats.zookeeper.curator.discovery;

import com.bytebeats.zookeeper.curator.CuratorUtils;
import com.bytebeats.zookeeper.curator.discovery.domain.ServerPayload;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceInstance;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* 模拟服务消费者
*
* @author Ricky Fung
* @create 2016-12-08 20:13
*/
public class ClientApp {

public static void main(String[] args) {

CuratorFramework client = null;
ServiceDiscover serviceDiscover = null;
try{
client = CuratorUtils.getCuratorClient();
client.start();

serviceDiscover = new ServiceDiscover(client, ServerApp.BASE_PATH);   //服务发现
serviceDiscover.start();

for(int i=0;i<10;i++){

ServiceInstance<ServerPayload> instance = serviceDiscover.getServiceProvider(ServerApp.SERVICE_NAME);

System.out.println("service:"+ServerApp.SERVICE_NAME+" instance id:"+instance.getId()+
", name:"+instance.getName()+ ", address:"+instance.getAddress()+", port:"+instance.getPort());

TimeUnit.SECONDS.sleep(3);
}

} catch (Exception e) {
e.printStackTrace();
} finally {
if(serviceDiscover!=null){
try {
serviceDiscover.close();
} catch (IOException e) {
e.printStackTrace();
}
}
client.close();
}
}
}


ServiceDiscover.java

package com.bytebeats.zookeeper.curator.discovery;

import com.bytebeats.zookeeper.curator.discovery.domain.ServerPayload;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RandomStrategy;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 服务发现
*
* @author Ricky Fung
* @create 2016-12-08 20:04
*/
public class ServiceDiscover {
private ServiceDiscovery<ServerPayload> serviceDiscovery;
private final ConcurrentHashMap<String, ServiceProvider<ServerPayload>> serviceProviderMap = new ConcurrentHashMap<>();

public ServiceDiscover(CuratorFramework client , String basePath){
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerPayload.class)
.client(client)
.basePath(basePath)
.serializer(new JsonInstanceSerializer<>(ServerPayload.class))
.build();
}

/**
* Note: When using Curator 2.x (Zookeeper 3.4.x) it's essential that service provider objects are cached by your application and reused.
* Since the internal NamespaceWatcher objects added by the service provider cannot be removed in Zookeeper 3.4.x,
* creating a fresh service provider for each call to the same service will eventually exhaust the memory of the JVM.
*/
public ServiceInstance<ServerPayload> getServiceProvider(String serviceName) throws Exception {
ServiceProvider<ServerPayload> provider = serviceProviderMap.get(serviceName);
if (provider == null) {
provider = serviceDiscovery.serviceProviderBuilder().
serviceName(serviceName).
providerStrategy(new RandomStrategy<ServerPayload>())
.build();

ServiceProvider<ServerPayload> oldProvider = serviceProviderMap.putIfAbsent(serviceName, provider);
if (oldProvider != null) {
provider = oldProvider;
}else {
provider.start();
}
}

return provider.getInstance();
}

public void start() throws Exception {
serviceDiscovery.start();
}

public void close() throws IOException {

for (Map.Entry<String, ServiceProvider<ServerPayload>> me : serviceProviderMap.entrySet()){
try{
me.getValue().close();
}catch (Exception e){
e.printStackTrace();
}
}
serviceDiscovery.close();
}
}


点此下载完整demo:https://github.com/TiFG/zookeeper-samples

参考

Service Discovery:http://curator.apache.org/curator-x-discovery/index.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  curator 服务发现
相关文章推荐