您的位置:首页 > 运维架构 > Apache

Apache Thrift 初学小讲(四)【用Apache Commons Pool实现连接池】

2017-01-10 01:07 211 查看

Apache Thrift本身没有提供连接池,我们可以用Apache Commons Pool来实现一个,Apache commons-pool本质上是"对象池",即通过一定的规则来维护对象集合的容器;commos-pool在很多场景中,用来实现连接池/任务worker池等,大家常用的dbcp数据库连接池,也是基于commons-pool实现的。TCP连接池里的是Socket对象,那么对于thirft连接池里的就是TTransport对象了。

 

参考了这篇文章:http://www.open-open.com/lib/view/open1415453575730.html,但上官网http://commons.apache.org/proper/commons-pool/download_pool.cgi下载jar时,发现官网上的例子的用法和上面那篇文章的例子的用法有点出入,才发现现在最新版是2.4.2,是重构过的,连包名pool都改成了pool2,不向下兼容的。官网poo2的例子:http://commons.apache.org/proper/commons-pool/examples.html

 

据说poo2显著的提升了性能和可伸缩性,特别是在高并发加载的情况下。2.0 版本包含可靠的实例跟踪和池监控,要求 JDK 1.6 或者更新版本。poo2使用了泛型,不用再把object强制转换为你的对象类型了。

 

简单使用时主要涉及几个类:

GenericObjectPool:对象池类,可以获取和归还一个对象PooledObject等;

BasePooledObjectFactory:对象池GenericObjectPool里管理的对象的工厂类,池可以通过此工厂类创建和销毁一个对象PooledObject等;

GenericObjectPoolConfig:对象池GenericObjectPool的配置类,例如最大连接数,最大和最小空闲连接数等;

 

查看GenericObjectPool的构造方法,刚好两个参数分别是BasePooledObjectFactory和GenericObjectPoolConfig,而BasePooledObjectFactory是需要我们继承来实现相应的方法的,简单点实现两个抽象方法create和wrap即可,而wrap用默认实现new一个DefaultPooledObject就可以了。所以其实我们只需要实现create方法就行了。

 

有没有发现poo1是通过实现makeObject方法来创建池对象的,而poo2是create方法,同时poo2也有一个

makeObject方法,好像有点多余或者矛盾,我们查看poo2的BasePooledObjectFactory的源码发现其实poo2也是通过makeObject来创建一个池对象的,只不过里面依次调用了create和wrap,create了一个你的对象后,再把这个对象wrap成池管理类型的对象PooledObject。

 

源码:org.apache.commons.pool2.BasePooledObjectFactory-》makeObject:

package org.apache.commons.pool2;

public abstract class BasePooledObjectFactory<T> implements PooledObjectFactory<T> {
public abstract T create() throws Exception;
public abstract PooledObject<T> wrap(T obj);

@Override
public PooledObject<T> makeObject() throws Exception {
return wrap(create());
}

@Override
public void destroyObject(PooledObject<T> p)
throws Exception  {
}
@Override
public boolean validateObject(PooledObject<T> p) {
return true;
}
@Override
public void activateObject(PooledObject<T> p) throws Exception {
}
@Override
public void passivateObject(PooledObject<T> p)
throws Exception {
}
}

 

关门,放代码:

连接池工厂类ConnectionPoolFactory.java:

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

public class ConnectionPoolFactory {
private GenericObjectPool<TTransport> pool;

public ConnectionPoolFactory(GenericObjectPoolConfig config, String ip, int port) {
ConnectionFactory objFactory = new ConnectionFactory(ip, port, 3000);
pool = new GenericObjectPool<TTransport>(objFactory, config);
}

//从池里获取一个Transport对象
public TTransport getConnection() throws Exception {
return pool.borrowObject();
}

//把一个Transport对象归还到池里
public void releaseConnection(TTransport transport) {
pool.returnObject(transport);
}

/*
* 连接池管理的对象Transport的工厂类,
* GenericObjectPool会使用此类的create方法来
* 创建Transport对象并放进pool里进行管理等操作。
*/
class ConnectionFactory extends BasePooledObjectFactory<TTransport> {
private String ip;
private int port;
private int socketTimeout;

public ConnectionFactory(String ip, int port, int socketTimeout) {
this.ip = ip;
this.port = port;
this.socketTimeout = socketTimeout;
}

//创建TTransport类型对象方法
@Override
public TTransport create() throws Exception {
TSocket socket = new TSocket(ip, port);
socket.setTimeout(socketTimeout);
TTransport transport = socket;
transport = new TFastFramedTransport(transport);
if ( !transport.isOpen() ) {
transport.open();
}
return transport;
}

//把TTransport对象打包成池管理的对象PooledObject<TTransport>
@Override
public PooledObject<TTransport> wrap(TTransport transport) {
return new DefaultPooledObject<TTransport>(transport);
}

}
}

 

使用连接池的客户端类TestPoolClient.java:

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;

import thrift.test.ThriftTest;
import thrift.test.User;

public class TestPoolClient {
public static void main(String [] args) throws Exception {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
//链接池中最大连接数,默认为8
config.setMaxTotal(2);
//当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位:毫秒;默认为-1,表示永不超时
config.setMaxWaitMillis(3000);
ConnectionPoolFactory poolFactory = new ConnectionPoolFactory(config, "127.0.0.1", 9090);
//从连接池取得指定的通信方式
TTransport transport = poolFactory.getConnection();

//指定的通信协议
TProtocol tProtocol = new TCompactProtocol(transport);
//ThriftTest.Client是生成的客户端代码
ThriftTest.Client testClient = new ThriftTest.Client(tProtocol);

//getUser就是ThriftTest.thrift所定义的接口
User user = testClient.getUser(2);
System.out.println("名字:"+ user.getName());

//归还transport到连接池
poolFactory.releaseConnection(transport);
}
}

 

运行TestServer.java,再运行TestPoolClient.java,结果:

(服务端打印)Starting the server on port 9090...

(客户端打印)名字:另外一个烟火

 

工程结构图:


 

附近src.rar是源码,感谢阅读!!!

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: