您的位置:首页 > 其它

基于zookeeper、连接池实现的Thrift服务负载均衡和服务发现

2017-03-31 18:51 741 查看
转载自:http://blog.csdn.net/zhu_tianwei/article/details/44115667

对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:

       1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里使用zookeeper,采用curator-recipes工具类进行处理服务的注册与发现。

       2.客户端使用连接池对服务调用进行管理,提升性能,使用Apache Commons项目commons-pool2,可以大大减少代码的复杂度。

       3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里采用随机加权方式,也是常有的负载算法。

1.Zookeeper的客户端

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class ZookeeperFactory {

private String zkHosts;
private int sessionTimeout = 30000;// session超时
private int connectionTimeout = 30000;

private String namespace;// 全局path前缀,常用来区分不同的应用

private final static String ROOT = "rpc";

private CuratorFramework zkClient;

public void setZkHosts(String zkHosts) {
this.zkHosts = zkHosts;
}

public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}

public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}

public synchronized CuratorFramework getZkClient() throws Exception {
if (zkClient == null) {
zkClient = create();
zkClient.start();
return zkClient;
}
return zkClient;
}

private CuratorFramework create() throws Exception {
if (StringUtils.isEmpty(namespace)) {
namespace = ROOT;
} else {
namespace = ROOT + "/" + namespace;
}
return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
}

private static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
return builder.connectString(connectString)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(30000)
.canBeReadOnly(true)
.namespace(namespace)
.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.defaultData(null)
.build();
}

public void close() {
if (zkClient != null) {
zkClient.close();
}
}
}

2. 定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;

/**
*  注册服务列表到Zookeeper
*/
public class ThriftServerAddressRegister {
private CuratorFramework zkClient;

public ThriftServerAddressRegister(){}

public ThriftServerAddressRegister(CuratorFramework zkClient){
this.zkClient = zkClient;
}

public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}

public void register(String service, String version, String address) {
if(zkClient.getState() == CuratorFrameworkState.LATENT){
zkClient.start();
}
if(StringUtils.isEmpty(version)){
version="1.0.0";
}
//临时节点
try {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/"+service+"/"+version+"/"+address);
} catch (UnsupportedEncodingException e) {
System.out.println("register service address to zookeeper exception.");
} catch (Exception e) {
System.out.println("register service address to zookeeper exception.");
}
}

public void close(){
zkClient.close();
}
}

3. 客户端发现服务, 定义获取服务地址接口

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;

import java.net.InetSocketAddress;
import java.util.*;

/**
* thrift server-service地址提供者,以便构建客户端连接池
*/
public class ThriftServerAddressProvider {
// 注册服务
private String service;
// 服务版本号
private String version = "1.0.0";

private PathChildrenCache cachedPath;

private CuratorFramework zkClient;

// 用来保存当前provider所接触过的地址记录
// 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
private Set<String> trace = new HashSet<>();

private final List<InetSocketAddress> container = new ArrayList<>();

private Queue<InetSocketAddress> inner = new LinkedList<>();

private Object lock = new Object();

private static final Integer DEFAULT_WEIGHT = 1;// 默认权重

public void setService(String service) {
this.service = service;
}

public void setVersion(String version) {
this.version = version;
}

public ThriftServerAddressProvider() {
}

public ThriftServerAddressProvider(CuratorFramework zkClient) {
this.zkClient = zkClient;
}

public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}

public void init() throws Exception {
// 如果zk尚未启动,则启动
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
buildPathChildrenCache(zkClient, getServicePath(), true);
cachedPath.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}

private String getServicePath() {
return "/" + service + "/" + version;
}

private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
cachedPath = new PathChildrenCache(client, path, cacheData);
cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type eventType = event.getType();
switch (eventType) {
case CONNECTION_RECONNECTED:
System.out.println("Connection is reconection.");
break;
case CONNECTION_SUSPENDED:
System.out.println("Connection is suspended.");
break;
case CONNECTION_LOST:
System.out.println("Connection error,waiting...");
return;
default:
//
}
// 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
cachedPath.rebuild();
rebuild();
}

protected void rebuild() throws Exception {
List<ChildData> children = cachedPath.getCurrentData();
if (children == null || children.isEmpty()) {
// 有可能所有的thrift server都与zookeeper断开了链接
// 但是,有可能,thrift client与thrift server之间的网络是良好的
// 因此此处是否需要清空container,是需要多方面考虑的.
container.clear();
System.out.println("thrift server-cluster error....");
return;
}
List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
String path = null;
for (ChildData data : children) {
path = data.getPath();
System.out.println("get path:" + path);
path = path.substring(getServicePath().length() + 1);
System.out.println("get serviceAddress:" + path);
String address = new String(path.getBytes(), "utf-8");
current.addAll(transfer(address));
trace.add(address);
}
Collections.shuffle(current);
synchronized (lock) {
container.clear();
container.addAll(current);
inner.clear();
inner.addAll(current);
}
}
});
}

private List<InetSocketAddress> transfer(String address) {
String[] hostname = address.split(":");
Integer weight = DEFAULT_WEIGHT;
if (hostname.length == 3) {
weight = Integer.valueOf(hostname[2]);
}
String ip = hostname[0];
Integer port = Integer.valueOf(hostname[1]);
List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
// 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载
for (int i = 0; i < weight; i++) {
result.add(new InetSocketAddress(ip, port));
}
return result;
}

public List<InetSocketAddress> findServerAddressList() {
return Collections.unmodifiableList(container);
}

public synchronized InetSocketAddress selector() {
if (inner.isEmpty()) {
if (!container.isEmpty()) {
inner.addAll(container);
} else if (!trace.isEmpty()) {
synchronized (lock) {
for (String hostname : trace) {
container.addAll(transfer(hostname));
}
Collections.shuffle(container);
inner.addAll(container);
}
}
}
return inner.poll();
}

public void close() {
try {
cachedPath.close();
zkClient.close();
} catch (Exception e) {
}
}

public String getService() {
return service;
}
}
这里,zkClient会监听服务注册路径上的节点变化,当有变动时,随时更新服务列表。

4. 客户端获取服务代理及连接池实现

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.net.InetSocketAddress;

/**
* 连接池,thrift-client for spring
*/
public class ThriftClientPoolFactory implements PooledObjectFactory<TServiceClient> {

private final ThriftServerAddressProvider serverAddressProvider;
private final TServiceClientFactory<TServiceClient> clientFactory;
private PoolOperationCallBack callback;

protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
this.serverAddressProvider = addressProvider;
this.clientFactory = clientFactory;
}

protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
PoolOperationCallBack callback) throws Exception {
this.serverAddressProvider = addressProvider;
this.clientFactory = clientFactory;
this.callback = callback;
}

static interface PoolOperationCallBack {
// 销毁client之前执行
void destroy(TServiceClient client);

// 创建成功是执行
void make(TServiceClient client);
}

@Override
public PooledObject<TServiceClient> makeObject() throws Exception {
InetSocketAddress address = serverAddressProvider.selector();

TSocket tsocket = new TSocket(address.getHostName(), address.getPort(), 10000);//10000ms超时
TTransport transport = new TFramedTransport(tsocket);
TProtocol protocol = new TBinaryProtocol(transport);
TServiceClient client = this.clientFactory.getClient(protocol);
transport.open();
if (callback != null) {
try {
callback.make(client);
} catch (Exception e) {
}
}
return new DefaultPooledObject<>(client);
}

@Override
public void destroyObject(PooledObject<TServiceClient> p) throws Exception {
TServiceClient client = p.getObject();

if (callback != null) {
try {
callback.destroy(client);
} catch (Exception e) {
}
}
TTransport pin = client.getInputProtocol().getTransport();
pin.close();
TTransport pout = client.getOutputProtocol().getTransport();
pout.close();
}

@Override
public boolean validateObject(PooledObject<TServiceClient> p) {
TServiceClient client = p.getObject();
TTransport pin = client.getInputProtocol().getTransport();
TTransport pout = client.getOutputProtocol().getTransport();
return pin.isOpen() && pout.isOpen();
}

@Override
public void activateObject(PooledObject<TServiceClient> p) throws Exception {
}

@Override
public void passivateObject(PooledObject<TServiceClient> p) throws Exception {
}
}
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;

/**
* 客户端代理
*/
public class ThriftServiceClientProxyFactory {
private ThriftServerAddressProvider serverAddressProvider;

private GenericObjectPool<TServiceClient> pool;

private ThriftClientPoolFactory.PoolOperationCallBack callback = new ThriftClientPoolFactory.PoolOperationCallBack() {
@Override
public void make(TServiceClient client) {
System.out.println("create");
}

@Override
public void destroy(TServiceClient client) {
System.out.println("destroy");
}
};

public GenericObjectPool<TServiceClient> getPool() {
return pool;
}

public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
this.serverAddressProvider = serverAddressProvider;
}

public void init() throws Exception {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// 加载Iface接口
Class<?> objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");

// 加载Client.Factory类
Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);

GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(100);
poolConfig.setMaxIdle(100);
poolConfig.setMinIdle(100);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(false);
poolConfig.setTestOnCreate(false);
poolConfig.setTestWhileIdle(true);
poolConfig.setTestOnReturn(false);//其他参数设置自行百度

pool = new GenericObjectPool<>(clientPool, poolConfig);
}
public void close() {
if (serverAddressProvider != null) {
serverAddressProvider.close();
}
}
}

5. 服务端服务注册

import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;

import java.lang.instrument.IllegalClassFormatException;
import java.lang.reflect.Constructor;

/**
* 服务端注册服务工厂
*/
public class ThriftServiceServerFactory {

private String ip = "127.0.0.1";
private Integer port = 8299;
// 优先级
private Integer weight = 1;// default
// 服务实现类
private Object service;// serice实现类
//服务版本号
private String version;

private ServerThread serverThread;

//服务注册
private ThriftServerAddressRegister thriftServerAddressRegister;

public void setPort(Integer port) {
this.port = port;
}

public void setWeight(Integer weight) {
this.weight = weight;
}

public void setService(Object service) {
this.service = service;
}

public void setVersion(String version) {
this.version = version;
}

public void init() throws Exception {
String hostname = ip + ":" + port + ":" + weight;
Class<?> serviceClass = service.getClass();
// 获取实现类接口
Class<?>[] interfaces = serviceClass.getInterfaces();
if (interfaces.length == 0) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
// reflect,load "Processor";
TProcessor processor = null;
String serviceName = null;
for (Class<?> clazz : interfaces) {
String cname = clazz.getSimpleName();
if (!cname.equals("Iface")) {
continue;
}
serviceName = clazz.getEnclosingClass().getName();
String pname = serviceName + "$Processor";
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> pclass = classLoader.loadClass(pname);
if (!TProcessor.class.isAssignableFrom(pclass)) {
continue;
}
Constructor<?> constructor = pclass.getConstructor(clazz);
processor = (TProcessor) constructor.newInstance(service);
break;
} catch (Exception e) {
//
}
}
if (processor == null) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
//需要单独的线程,因为serve方法是阻塞的.
serverThread = new ServerThread(processor, port);
serverThread.start();
// 注册服务
if (thriftServerAddressRegister != null) {
thriftServerAddressRegister.register(serviceName, version, hostname);
}
}

class ServerThread extends Thread {
private TServer server;

ServerThread(TProcessor processor, int port) throws Exception {
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
TProcessorFactory processorFactory = new TProcessorFactory(processor);
tArgs.processorFactory(processorFactory);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory(new TBinaryProtocol.Factory(true, true));
server = new TThreadedSelectorServer(tArgs);
}

@Override
public void run() {
try {
//启动服务
server.serve();
} catch (Exception e) {
}
}

public void stopServer() {
server.stop();
}
}

public void close() {
serverThread.stopServer();
}
}

6.客户端Demo(示例,未运行)

CuratorFramework zkClientServer = new ZookeeperFactory().getZkClient();
ThriftServerAddressProvider addressProvider = new ThriftServerAddressProvider(zkClientServer);
addressProvider.setService("");
// ...属性设置
addressProvider.init();

ThriftServiceClientProxyFactory clientProxyFactory = new ThriftServiceClientProxyFactory();
clientProxyFactory.setServerAddressProvider(addressProvider);
clientProxyFactory.init();

GenericObjectPool<TServiceClient> thriftClientPool =clientProxyFactory.getPool();
TServiceClient thriftClient = thriftClientPool.borrowObject();
//。。。
thriftClientPool.returnObject(thriftClient);
//internalPool.invalidateObject(thriftClient);


可以改进的地方:

       thriftClientPool通过随机方式基本上实现了负载均衡,但是当某一台机器某段时间性能变差(每秒能处理的请求量要低于其它),导致出现超时时,通常会在pool中关掉对应的client,但是从代码上看出,thriftClientPool中该机器对应的client可能不止一个,而且GenericObjectPool会定时检查pool中的对象数量是否满足我们的设置(GenericObjectPoolConfig的设置),如果不够会调用ThriftServerAddressProvider.selector()来获取服务IP来新建连接填补pool。

       于是我们可以做些改进,对有问题的client对应的服务IP,反馈标记出来,然后在ThriftServerAddressProvider的List<InetSocketAddress>中调整其的权重,比如相较于其他服务IP的占比数小于10%或则更多,这样就间接的降低了有性能问题服务器的QPS,避免了反复删毁、重建连接。同时,还要有恢复机制,当机器性能恢复后,我们需要将其QPS提升上来。

       一点想法:可以封装下thriftClientPool,当要删毁连接时获取其IP,在ThriftServerAddressProvider反馈出来,如果GenericObjectPool填补时不再使用该IP,这样应该可以降低该服务器的QPS。与此同时,我们也需要重构pool,一来避免某些机器qps一直很高(不断填补了问题机器的缺),二来可以做到服务恢复机制。于是可以定时重构该pool,到了时间点,每次回到pool的client都销毁掉,这样GenericObjectPool会重建client,这时会从Queue<InetSocketAddress>一个一个poll服务IP新建client,Queue<InetSocketAddress>的构建依赖List<InetSocketAddress>,只需要对List<InetSocketAddress>做些文章即可,比如把上个周期内有问题的IP权重继续下调,或则恢复,这就看具体业务了。

       以上只是个人想法,不足之处欢迎一起探讨。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: