基于zookeeper、连接池实现的Thrift服务负载均衡和服务发现
2017-03-31 18:51
741 查看
转载自:http://blog.csdn.net/zhu_tianwei/article/details/44115667
对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:
1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里使用zookeeper,采用curator-recipes工具类进行处理服务的注册与发现。
2.客户端使用连接池对服务调用进行管理,提升性能,使用Apache Commons项目commons-pool2,可以大大减少代码的复杂度。
3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里采用随机加权方式,也是常有的负载算法。
可以改进的地方:
thriftClientPool通过随机方式基本上实现了负载均衡,但是当某一台机器某段时间性能变差(每秒能处理的请求量要低于其它),导致出现超时时,通常会在pool中关掉对应的client,但是从代码上看出,thriftClientPool中该机器对应的client可能不止一个,而且GenericObjectPool会定时检查pool中的对象数量是否满足我们的设置(GenericObjectPoolConfig的设置),如果不够会调用ThriftServerAddressProvider.selector()来获取服务IP来新建连接填补pool。
于是我们可以做些改进,对有问题的client对应的服务IP,反馈标记出来,然后在ThriftServerAddressProvider的List<InetSocketAddress>中调整其的权重,比如相较于其他服务IP的占比数小于10%或则更多,这样就间接的降低了有性能问题服务器的QPS,避免了反复删毁、重建连接。同时,还要有恢复机制,当机器性能恢复后,我们需要将其QPS提升上来。
一点想法:可以封装下thriftClientPool,当要删毁连接时获取其IP,在ThriftServerAddressProvider反馈出来,如果GenericObjectPool填补时不再使用该IP,这样应该可以降低该服务器的QPS。与此同时,我们也需要重构pool,一来避免某些机器qps一直很高(不断填补了问题机器的缺),二来可以做到服务恢复机制。于是可以定时重构该pool,到了时间点,每次回到pool的client都销毁掉,这样GenericObjectPool会重建client,这时会从Queue<InetSocketAddress>一个一个poll服务IP新建client,Queue<InetSocketAddress>的构建依赖List<InetSocketAddress>,只需要对List<InetSocketAddress>做些文章即可,比如把上个周期内有问题的IP权重继续下调,或则恢复,这就看具体业务了。
以上只是个人想法,不足之处欢迎一起探讨。
对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:
1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里使用zookeeper,采用curator-recipes工具类进行处理服务的注册与发现。
2.客户端使用连接池对服务调用进行管理,提升性能,使用Apache Commons项目commons-pool2,可以大大减少代码的复杂度。
3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里采用随机加权方式,也是常有的负载算法。
1.Zookeeper的客户端
import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; public class ZookeeperFactory { private String zkHosts; private int sessionTimeout = 30000;// session超时 private int connectionTimeout = 30000; private String namespace;// 全局path前缀,常用来区分不同的应用 private final static String ROOT = "rpc"; private CuratorFramework zkClient; public void setZkHosts(String zkHosts) { this.zkHosts = zkHosts; } public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; } public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } public void setNamespace(String namespace) { this.namespace = namespace; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } public synchronized CuratorFramework getZkClient() throws Exception { if (zkClient == null) { zkClient = create(); zkClient.start(); return zkClient; } return zkClient; } private CuratorFramework create() throws Exception { if (StringUtils.isEmpty(namespace)) { namespace = ROOT; } else { namespace = ROOT + "/" + namespace; } return create(zkHosts, sessionTimeout, connectionTimeout, namespace); } private static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); return builder.connectString(connectString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(30000) .canBeReadOnly(true) .namespace(namespace) .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)) .defaultData(null) .build(); } public void close() { if (zkClient != null) { zkClient.close(); } } }
2. 定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; /** * 注册服务列表到Zookeeper */ public class ThriftServerAddressRegister { private CuratorFramework zkClient; public ThriftServerAddressRegister(){} public ThriftServerAddressRegister(CuratorFramework zkClient){ this.zkClient = zkClient; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } public void register(String service, String version, String address) { if(zkClient.getState() == CuratorFrameworkState.LATENT){ zkClient.start(); } if(StringUtils.isEmpty(version)){ version="1.0.0"; } //临时节点 try { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath("/"+service+"/"+version+"/"+address); } catch (UnsupportedEncodingException e) { System.out.println("register service address to zookeeper exception."); } catch (Exception e) { System.out.println("register service address to zookeeper exception."); } } public void close(){ zkClient.close(); } }
3. 客户端发现服务, 定义获取服务地址接口
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import java.net.InetSocketAddress; import java.util.*; /** * thrift server-service地址提供者,以便构建客户端连接池 */ public class ThriftServerAddressProvider { // 注册服务 private String service; // 服务版本号 private String version = "1.0.0"; private PathChildrenCache cachedPath; private CuratorFramework zkClient; // 用来保存当前provider所接触过的地址记录 // 当zookeeper集群故障时,可以使用trace中地址,作为"备份" private Set<String> trace = new HashSet<>(); private final List<InetSocketAddress> container = new ArrayList<>(); private Queue<InetSocketAddress> inner = new LinkedList<>(); private Object lock = new Object(); private static final Integer DEFAULT_WEIGHT = 1;// 默认权重 public void setService(String service) { this.service = service; } public void setVersion(String version) { this.version = version; } public ThriftServerAddressProvider() { } public ThriftServerAddressProvider(CuratorFramework zkClient) { this.zkClient = zkClient; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } public void init() throws Exception { // 如果zk尚未启动,则启动 if (zkClient.getState() == CuratorFrameworkState.LATENT) { zkClient.start(); } buildPathChildrenCache(zkClient, getServicePath(), true); cachedPath.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } private String getServicePath() { return "/" + service + "/" + version; } private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception { cachedPath = new PathChildrenCache(client, path, cacheData); cachedPath.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { case CONNECTION_RECONNECTED: System.out.println("Connection is reconection."); break; case CONNECTION_SUSPENDED: System.out.println("Connection is suspended."); break; case CONNECTION_LOST: System.out.println("Connection error,waiting..."); return; default: // } // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法. cachedPath.rebuild(); rebuild(); } protected void rebuild() throws Exception { List<ChildData> children = cachedPath.getCurrentData(); if (children == null || children.isEmpty()) { // 有可能所有的thrift server都与zookeeper断开了链接 // 但是,有可能,thrift client与thrift server之间的网络是良好的 // 因此此处是否需要清空container,是需要多方面考虑的. container.clear(); System.out.println("thrift server-cluster error...."); return; } List<InetSocketAddress> current = new ArrayList<InetSocketAddress>(); String path = null; for (ChildData data : children) { path = data.getPath(); System.out.println("get path:" + path); path = path.substring(getServicePath().length() + 1); System.out.println("get serviceAddress:" + path); String address = new String(path.getBytes(), "utf-8"); current.addAll(transfer(address)); trace.add(address); } Collections.shuffle(current); synchronized (lock) { container.clear(); container.addAll(current); inner.clear(); inner.addAll(current); } } }); } private List<InetSocketAddress> transfer(String address) { String[] hostname = address.split(":"); Integer weight = DEFAULT_WEIGHT; if (hostname.length == 3) { weight = Integer.valueOf(hostname[2]); } String ip = hostname[0]; Integer port = Integer.valueOf(hostname[1]); List<InetSocketAddress> result = new ArrayList<InetSocketAddress>(); // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载 for (int i = 0; i < weight; i++) { result.add(new InetSocketAddress(ip, port)); } return result; } public List<InetSocketAddress> findServerAddressList() { return Collections.unmodifiableList(container); } public synchronized InetSocketAddress selector() { if (inner.isEmpty()) { if (!container.isEmpty()) { inner.addAll(container); } else if (!trace.isEmpty()) { synchronized (lock) { for (String hostname : trace) { container.addAll(transfer(hostname)); } Collections.shuffle(container); inner.addAll(container); } } } return inner.poll(); } public void close() { try { cachedPath.close(); zkClient.close(); } catch (Exception e) { } } public String getService() { return service; } }这里,zkClient会监听服务注册路径上的节点变化,当有变动时,随时更新服务列表。
4. 客户端获取服务代理及连接池实现
import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import java.net.InetSocketAddress; /** * 连接池,thrift-client for spring */ public class ThriftClientPoolFactory implements PooledObjectFactory<TServiceClient> { private final ThriftServerAddressProvider serverAddressProvider; private final TServiceClientFactory<TServiceClient> clientFactory; private PoolOperationCallBack callback; protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; } protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory, PoolOperationCallBack callback) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; this.callback = callback; } static interface PoolOperationCallBack { // 销毁client之前执行 void destroy(TServiceClient client); // 创建成功是执行 void make(TServiceClient client); } @Override public PooledObject<TServiceClient> makeObject() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); TSocket tsocket = new TSocket(address.getHostName(), address.getPort(), 10000);//10000ms超时 TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.make(client); } catch (Exception e) { } } return new DefaultPooledObject<>(client); } @Override public void destroyObject(PooledObject<TServiceClient> p) throws Exception { TServiceClient client = p.getObject(); if (callback != null) { try { callback.destroy(client); } catch (Exception e) { } } TTransport pin = client.getInputProtocol().getTransport(); pin.close(); TTransport pout = client.getOutputProtocol().getTransport(); pout.close(); } @Override public boolean validateObject(PooledObject<TServiceClient> p) { TServiceClient client = p.getObject(); TTransport pin = client.getInputProtocol().getTransport(); TTransport pout = client.getOutputProtocol().getTransport(); return pin.isOpen() && pout.isOpen(); } @Override public void activateObject(PooledObject<TServiceClient> p) throws Exception { } @Override public void passivateObject(PooledObject<TServiceClient> p) throws Exception { } }
import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; /** * 客户端代理 */ public class ThriftServiceClientProxyFactory { private ThriftServerAddressProvider serverAddressProvider; private GenericObjectPool<TServiceClient> pool; private ThriftClientPoolFactory.PoolOperationCallBack callback = new ThriftClientPoolFactory.PoolOperationCallBack() { @Override public void make(TServiceClient client) { System.out.println("create"); } @Override public void destroy(TServiceClient client) { System.out.println("destroy"); } }; public GenericObjectPool<TServiceClient> getPool() { return pool; } public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) { this.serverAddressProvider = serverAddressProvider; } public void init() throws Exception { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 加载Iface接口 Class<?> objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface"); // 加载Client.Factory类 Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory"); TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance(); ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxTotal(100); poolConfig.setMaxIdle(100); poolConfig.setMinIdle(100); poolConfig.setBlockWhenExhausted(true); poolConfig.setTestOnBorrow(false); poolConfig.setTestOnCreate(false); poolConfig.setTestWhileIdle(true); poolConfig.setTestOnReturn(false);//其他参数设置自行百度 pool = new GenericObjectPool<>(clientPool, poolConfig); } public void close() { if (serverAddressProvider != null) { serverAddressProvider.close(); } } }
5. 服务端服务注册
import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import java.lang.instrument.IllegalClassFormatException; import java.lang.reflect.Constructor; /** * 服务端注册服务工厂 */ public class ThriftServiceServerFactory { private String ip = "127.0.0.1"; private Integer port = 8299; // 优先级 private Integer weight = 1;// default // 服务实现类 private Object service;// serice实现类 //服务版本号 private String version; private ServerThread serverThread; //服务注册 private ThriftServerAddressRegister thriftServerAddressRegister; public void setPort(Integer port) { this.port = port; } public void setWeight(Integer weight) { this.weight = weight; } public void setService(Object service) { this.service = service; } public void setVersion(String version) { this.version = version; } public void init() throws Exception { String hostname = ip + ":" + port + ":" + weight; Class<?> serviceClass = service.getClass(); // 获取实现类接口 Class<?>[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("service-class should implements Iface"); } // reflect,load "Processor"; TProcessor processor = null; String serviceName = null; for (Class<?> clazz : interfaces) { String cname = clazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } serviceName = clazz.getEnclosingClass().getName(); String pname = serviceName + "$Processor"; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<?> pclass = classLoader.loadClass(pname); if (!TProcessor.class.isAssignableFrom(pclass)) { continue; } Constructor<?> constructor = pclass.getConstructor(clazz); processor = (TProcessor) constructor.newInstance(service); break; } catch (Exception e) { // } } if (processor == null) { throw new IllegalClassFormatException("service-class should implements Iface"); } //需要单独的线程,因为serve方法是阻塞的. serverThread = new ServerThread(processor, port); serverThread.start(); // 注册服务 if (thriftServerAddressRegister != null) { thriftServerAddressRegister.register(serviceName, version, hostname); } } class ServerThread extends Thread { private TServer server; ServerThread(TProcessor processor, int port) throws Exception { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); TProcessorFactory processorFactory = new TProcessorFactory(processor); tArgs.processorFactory(processorFactory); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory(new TBinaryProtocol.Factory(true, true)); server = new TThreadedSelectorServer(tArgs); } @Override public void run() { try { //启动服务 server.serve(); } catch (Exception e) { } } public void stopServer() { server.stop(); } } public void close() { serverThread.stopServer(); } }
6.客户端Demo(示例,未运行)
CuratorFramework zkClientServer = new ZookeeperFactory().getZkClient(); ThriftServerAddressProvider addressProvider = new ThriftServerAddressProvider(zkClientServer); addressProvider.setService(""); // ...属性设置 addressProvider.init(); ThriftServiceClientProxyFactory clientProxyFactory = new ThriftServiceClientProxyFactory(); clientProxyFactory.setServerAddressProvider(addressProvider); clientProxyFactory.init(); GenericObjectPool<TServiceClient> thriftClientPool =clientProxyFactory.getPool(); TServiceClient thriftClient = thriftClientPool.borrowObject(); //。。。 thriftClientPool.returnObject(thriftClient); //internalPool.invalidateObject(thriftClient);
可以改进的地方:
thriftClientPool通过随机方式基本上实现了负载均衡,但是当某一台机器某段时间性能变差(每秒能处理的请求量要低于其它),导致出现超时时,通常会在pool中关掉对应的client,但是从代码上看出,thriftClientPool中该机器对应的client可能不止一个,而且GenericObjectPool会定时检查pool中的对象数量是否满足我们的设置(GenericObjectPoolConfig的设置),如果不够会调用ThriftServerAddressProvider.selector()来获取服务IP来新建连接填补pool。
于是我们可以做些改进,对有问题的client对应的服务IP,反馈标记出来,然后在ThriftServerAddressProvider的List<InetSocketAddress>中调整其的权重,比如相较于其他服务IP的占比数小于10%或则更多,这样就间接的降低了有性能问题服务器的QPS,避免了反复删毁、重建连接。同时,还要有恢复机制,当机器性能恢复后,我们需要将其QPS提升上来。
一点想法:可以封装下thriftClientPool,当要删毁连接时获取其IP,在ThriftServerAddressProvider反馈出来,如果GenericObjectPool填补时不再使用该IP,这样应该可以降低该服务器的QPS。与此同时,我们也需要重构pool,一来避免某些机器qps一直很高(不断填补了问题机器的缺),二来可以做到服务恢复机制。于是可以定时重构该pool,到了时间点,每次回到pool的client都销毁掉,这样GenericObjectPool会重建client,这时会从Queue<InetSocketAddress>一个一个poll服务IP新建client,Queue<InetSocketAddress>的构建依赖List<InetSocketAddress>,只需要对List<InetSocketAddress>做些文章即可,比如把上个周期内有问题的IP权重继续下调,或则恢复,这就看具体业务了。
以上只是个人想法,不足之处欢迎一起探讨。
相关文章推荐
- Thrift 个人实战--RPC服务的发布订阅实现(基于Zookeeper服务)
- 实例详解LB基于LVS架构之NAT、DR模型架构实现web服务负载均衡
- 用ZooKeeper做为注册中心搭建基于Spring Cloud实现服务注册与发现
- Consul + fabio 实现自动服务发现、负载均衡 - DockOne.io
- Thrift 个人实战--RPC服务的发布订阅实现(基于Zookeeper服务)
- springboot使用zookeeper(curator)实现注册发现与负载均衡
- 基于keepalive+LVS的高可用集群来实现web服务负载均衡集群的架构
- 基于Docker的负载均衡和服务发现
- SpringCloud自学入门-服务发现与消费并实现负载均衡
- 基于Docker的负载均衡和服务发现
- web集群服务的负载均衡方案选择与实现
- 使用nginx sticky实现基于cookie的负载均衡
- 使用nginx sticky模块实现基于cookie的负载均衡
- 使用nginx sticky实现基于cookie的负载均衡 – ttlsa教程系列之nginx
- 【mysql连接池】之php+sqlrelay+mysql实现连接池及读写负载均衡 .
- web集群服务的负载均衡方案选择与实现
- web集群服务的负载均衡方案选择与实现
- 基于keepalive+LVS的高可用集群来实现web服务负载均衡集群的架构
- 用 LVS (DR) 实现负载均衡 Apache 和 FTP 两个服务
- web集群服务的负载均衡方案选择与实现