您的位置:首页 > 编程语言 > Java开发

[转载] Thrift-client与spring集成

2015-10-28 23:47 513 查看
转载自http://shift-alt-ctrl.iteye.com/blog/1990030?utm_source=tuicool&utm_medium=referral

Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:

1) client端需要知道server端的IP + port,如果是分布式部署,还需要知道所有server的IP + port列表.

2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案.

3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利.

4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip + port,可以使用zookeeper来推送每个service的服务地址.

5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。

1. pom.xml

Java代码


<dependencies>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-context</artifactId>

<version>3.0.7.RELEASE</version>

</dependency>

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.5</version>

<!--<exclusions>-->

<!--<exclusion>-->

<!--<groupId>log4j</groupId>-->

<!--<artifactId>log4j</artifactId>-->

<!--</exclusion>-->

<!--</exclusions>-->

</dependency>

<!--

<dependency>

<groupId>com.101tec</groupId>

<artifactId>zkclient</artifactId>

<version>0.4</version>

</dependency>

-->

<dependency>

<groupId>org.apache.thrift</groupId>

<artifactId>libthrift</artifactId>

<version>0.9.1</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>2.3.0</version>

</dependency>

<dependency>

<groupId>commons-pool</groupId>

<artifactId>commons-pool</artifactId>

<version>1.6</version>

</dependency>

</dependencies>

2. spring-thrift-client.xml

其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.

Java代码


<!-- fixedAddress -->

<!--

<bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">

<property name="service" value="com.demo.service.UserService"></property>

<property name="serverAddress" value="127.0.0.1:9090:2"></property>

<property name="maxActive" value="5"></property>

<property name="idleTime" value="10000"></property>

</bean>

-->

<!-- zookeeper -->

<bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">

<property name="connectString" value="127.0.0.1:2181"></property>

<property name="namespace" value="demo/thrift-service"></property>

</bean>

<bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">

<property name="service" value="com.demo.service.UserService"></property>

<property name="maxActive" value="5"></property>

<property name="idleTime" value="1800000"></property>

<property name="addressProvider">

<bean class="com.demo.thrift.support.impl.DynamicAddressProvider">

<property name="configPath" value="UserServiceImpl"></property>

<property name="zookeeper" ref="thriftZookeeper"></property>

</bean>

</property>

</bean>

3. ThriftServiceClientProxyFactory.java

因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".

Java代码


@SuppressWarnings("rawtypes")

public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {

private String service;

private String serverAddress;

private Integer maxActive = 32;//最大活跃连接数

////ms,default 3 min,链接空闲时间

//-1,关闭空闲检测

private Integer idleTime = 180000;

private ThriftServerAddressProvider addressProvider;

private Object proxyClient;

public void setMaxActive(Integer maxActive) {

this.maxActive = maxActive;

}

public void setIdleTime(Integer idleTime) {

this.idleTime = idleTime;

}

public void setService(String service) {

this.service = service;

}

public void setServerAddress(String serverAddress) {

this.serverAddress = serverAddress;

}

public void setAddressProvider(ThriftServerAddressProvider addressProvider) {

this.addressProvider = addressProvider;

}

private Class objectClass;

private GenericObjectPool<TServiceClient> pool;

private PoolOperationCallBack callback = new PoolOperationCallBack() {

@Override

public void make(TServiceClient client) {

System.out.println("create");

}

@Override

public void destroy(TServiceClient client) {

System.out.println("destroy");

}

};

@Override

public void afterPropertiesSet() throws Exception {

if(serverAddress != null){

addressProvider = new FixedAddressProvider(serverAddress);

}

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

//加载Iface接口

objectClass = classLoader.loadClass(service + "$Iface");

//加载Client.Factory类

Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");

TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();

ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);

GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();

poolConfig.maxActive = maxActive;

poolConfig.minIdle = 0;

poolConfig.minEvictableIdleTimeMillis = idleTime;

poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;

pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);

proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

//

TServiceClient client = pool.borrowObject();

try{

return method.invoke(client, args);

}catch(Exception e){

throw e;

}finally{

pool.returnObject(client);

}

}

});

}

@Override

public Object getObject() throws Exception {

return proxyClient;

}

@Override

public Class<?> getObjectType() {

return objectClass;

}

@Override

public boolean isSingleton() {

return true; //To change body of implemented methods use File | Settings | File Templates.

}

public void close(){

if(addressProvider != null){

addressProvider.close();

}

}

}

4. ThriftClientPoolFactory.java

"Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.

Java代码


/**

* 连接池,thrift-client for spring

*/

public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{

private final ThriftServerAddressProvider addressProvider;

private final TServiceClientFactory<TServiceClient> clientFactory;

private PoolOperationCallBack callback;

protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {

this.addressProvider = addressProvider;

this.clientFactory = clientFactory;

}

protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {

this.addressProvider = addressProvider;

this.clientFactory = clientFactory;

this.callback = callback;

}

@Override

public TServiceClient makeObject() throws Exception {

InetSocketAddress address = addressProvider.selector();

TSocket tsocket = new TSocket(address.getHostName(),address.getPort());

TProtocol protocol = new TBinaryProtocol(tsocket);

TServiceClient client = this.clientFactory.getClient(protocol);

tsocket.open();

if(callback != null){

try{

callback.make(client);

}catch(Exception e){

//

}

}

return client;

}

public void destroyObject(TServiceClient client) throws Exception {

if(callback != null){

try{

callback.destroy(client);

}catch(Exception e){

//

}

}

TTransport pin = client.getInputProtocol().getTransport();

pin.close();

}

public boolean validateObject(TServiceClient client) {

TTransport pin = client.getInputProtocol().getTransport();

return pin.isOpen();

}

static interface PoolOperationCallBack {

//销毁client之前执行

void destroy(TServiceClient client);

//创建成功是执行

void make(TServiceClient client);

}

}

5. DynamicAddressProvider.java

将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.

Java代码


/**

* 可以动态获取address地址,方案设计参考

* 1) 可以间歇性的调用一个web-service来获取地址

* 2) 可以使用事件监听机制,被动的接收消息,来获取最新的地址(比如基于MQ,nio等)

* 3) 可以基于zookeeper-watcher机制,获取最新地址

* <p/>

* 本实例,使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发

* 如下实现,仅供参考

*/

public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {

private String configPath;

private PathChildrenCache cachedPath;

private CuratorFramework zookeeper;

//用来保存当前provider所接触过的地址记录

//当zookeeper集群故障时,可以使用trace中地址,作为"备份"

private Set<String> trace = new HashSet<String>();

private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();

private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();

private Object lock = new Object();

private static final Integer DEFAULT_PRIORITY = 1;

public void setConfigPath(String configPath) {

this.configPath = configPath;

}

public void setZookeeper(CuratorFramework zookeeper) {

this.zookeeper = zookeeper;

}

@Override

public void afterPropertiesSet() throws Exception {

//如果zk尚未启动,则启动

if(zookeeper.getState() == CuratorFrameworkState.LATENT){

zookeeper.start();

}

buildPathChildrenCache(zookeeper, configPath, true);

cachedPath.start(StartMode.POST_INITIALIZED_EVENT);

}

private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {

cachedPath = new PathChildrenCache(client, path, cacheData);

cachedPath.getListenable().addListener(new PathChildrenCacheListener() {

@Override

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

PathChildrenCacheEvent.Type eventType = event.getType();

switch (eventType) {

// case CONNECTION_RECONNECTED:

//

// break;

case CONNECTION_SUSPENDED:

case CONNECTION_LOST:

System.out.println("Connection error,waiting...");

return;

default:

//

}

//任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.

cachedPath.rebuild();

rebuild();

}

protected void rebuild() throws Exception {

List<ChildData> children = cachedPath.getCurrentData();

if (children == null || children.isEmpty()) {

//有可能所有的thrift server都与zookeeper断开了链接

//但是,有可能,thrift client与thrift server之间的网络是良好的

//因此此处是否需要清空container,是需要多方面考虑的.

container.clear();

System.out.println("thrift server-cluster error....");

return;

}

List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();

for (ChildData data : children) {

String address = new String(data.getData(), "utf-8");

current.addAll(transfer(address));

trace.add(address);

}

Collections.shuffle(current);

synchronized (lock) {

container.clear();

container.addAll(current);

inner.clear();

inner.addAll(current);

}

}

});

}

private List<InetSocketAddress> transfer(String address){

String[] hostname = address.split(":");

Integer priority = DEFAULT_PRIORITY;

if (hostname.length == 3) {

priority = Integer.valueOf(hostname[2]);

}

String ip = hostname[0];

Integer port = Integer.valueOf(hostname[1]);

List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();

for (int i = 0; i < priority; i++) {

result.add(new InetSocketAddress(ip, port));

}

return result;

}

@Override

public List<InetSocketAddress> getAll() {

return Collections.unmodifiableList(container);

}

@Override

public synchronized InetSocketAddress selector() {

if (inner.isEmpty()) {

if(!container.isEmpty()){

inner.addAll(container);

}else if(!trace.isEmpty()){

synchronized (lock) {

for(String hostname : trace){

container.addAll(transfer(hostname));

}

Collections.shuffle(container);

inner.addAll(container);

}

}

}

return inner.poll();//null

}

@Override

public void close() {

try {

cachedPath.close();

zookeeper.close();

} catch (Exception e) {

//

}

}

}

到此为止,我们的Thrift基本上就可以顺利运行起来了.更多代码,参见附件.

Thrift-server端开发与配置,参见[Thrift-server]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: