您的位置:首页 > 编程语言 > Java开发

用JAVA实现无等待数据库连接池

2010-11-18 15:26 176 查看
我们都知道数据库连接是一种有限和非常昂贵的应用资源,怎样对这些资源进行高效的管理,能有效的改善整个系统的性能和健壮性。数据库连接池正是针对这个问题而提出来的。

数据库连接负责分配、释放和管理数据库连接。使数据库连接可以重复利用,而不是用一次建立一次数据库连接。



基本思路

建立一个容器

每次到这个容器里得到连接,如果为空则建立一个新连接。

当连接使用完后归还给这个容器



这里就有二个难点

1. 容器必需是同步的,线程安全的。

2. 连接怎归还连接池



方案:

针对这二个难点,我们分别提出了二个解决方法

1.使用ConcurrentLinkedQueue实现先进先出队列

ConcurrentLinkedQueue无界线程安全队列介绍

这个类在java.util.concurrent包中,我们来看看官方是怎描述这个类的
一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素.此实现采用了有效的“无等待 (wait-free)”算法

2.动态代理实现连接归还连接池

大家也可以参考刘冬在IBM发表的文章

http://www.ibm.com/developerworks/cn/java/l-connpoolproxy/




接下来我们来看看整体代码



import java.io.PrintWriter;

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import java.sql.Connection;

import java.sql.Driver;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import java.util.concurrent.ConcurrentLinkedQueue;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.locks.ReentrantLock;



import javax.sql.DataSource;



public class JavaGGDataSource implements DataSource {

//连接队列

private ConcurrentLinkedQueue<_Connection> connQueue = new ConcurrentLinkedQueue<_Connection>();

//存放所有连接容器

private List<_Connection> conns = new ArrayList<_Connection>();

private Driver driver = null;



private String jdbcUrl = null;

private String user = null;

private String password = null;

private int maxActive = -1;// -1为不限制连接数

private String driverClass = null;

private int timeout = 1000 * 60 * 60 * 4;// 默认为4小时,即4小时没有任何sql操作就把所有连接重新建立连接

private AtomicLong lastCheckout = new AtomicLong(System.currentTimeMillis());

private AtomicInteger connCount = new AtomicInteger();

//线程锁,主要用于新建连接和清空连接时

private ReentrantLock lock = new ReentrantLock();



public void closeAllConnection() {

}



/**

* 归还连接给连接池

*

* @param conn

*@date 2009-8-13

*@author eric.chan

*/

public void offerConnection(_Connection conn) {

connQueue.offer(conn);

}



@Override

public Connection getConnection() throws SQLException {

return getConnection(user, password);

}



/**

* 从池中得到连接,如果池中没有连接,则建立新的sql连接

*

* @param username

* @param password

* @author eric.chan

*/

@Override

public Connection getConnection(String username, String password)

throws SQLException {

checkTimeout();

_Connection conn = connQueue.poll();

if (conn == null) {

if (maxActive > 0 && connCount.get() >= maxActive) {

for (;;) {// 采用自旋方法 从已满的池中得到一个连接

conn = connQueue.poll();

if (conn != null)

break;

else

continue;

}

}

lock.lock();

try {

if (maxActive > 0 && connCount.get() >= maxActive) {

// 处理并发问题

return getConnection(username, password);

}

Properties info = new Properties();

info.put("user", username);

info.put("password", password);

Connection conn1 = loadDriver().connect(jdbcUrl, info);

conn = new _Connection(conn1, this);

int c = connCount.incrementAndGet();// 当前连接数加1

conns.add(conn);

System.out.println("info : init no. " + c + " connectioned");

} finally {

lock.unlock();

}

}

lastCheckout.getAndSet(System.currentTimeMillis());

return conn.getConnection();

}



/**

* 检查最后一次的连接时间

*

* @throws SQLException

*@date 2009-8-13

*@author eric.chan

*/

private void checkTimeout() throws SQLException {

long now = System.currentTimeMillis();

long lt = lastCheckout.get();

if ((now - lt) > timeout) {

_Connection conn = null;

lock.lock();

try {

if(connCount.get()==0)return;

while ((conn = connQueue.poll()) != null) {

System.out.println("connection " + conn + " close ");

conn.close();

conn = null;

}

for(_Connection con:conns){

con.close();

}

conns.clear();

System.out.println("info : reset all connections");

connCount.getAndSet(0);// 重置连接数计数器

lastCheckout.getAndSet(System.currentTimeMillis());

} finally {

lock.unlock();

}

}

}



/**

*

* @return

*@date 2009-8-13

*@author eric.chan

*/

private Driver loadDriver() {

if (driver == null) {

try {

driver = (Driver) Class.forName(driverClass).newInstance();

} catch (ClassNotFoundException e) {

System.out.println("error : can not find driver class " + driverClass);

} catch (Exception e) {

e.printStackTrace();

}

}

return driver;

}



@Override

public PrintWriter getLogWriter() throws SQLException {

return null;

}



@Override

public int getLoginTimeout() throws SQLException {

return 0;

}



@Override

public void setLogWriter(PrintWriter out) throws SQLException {

}



@Override

public void setLoginTimeout(int seconds) throws SQLException {

}



@Override

public boolean isWrapperFor(Class iface) throws SQLException {

throw new SQLException("no Implemented isWrapperFor method");

}



@Override

public T unwrap(Class iface) throws SQLException {

throw new SQLException("no Implemented unwrap method");

}



public String getJdbcUrl() {

return jdbcUrl;

}



public void setJdbcUrl(String jdbcUrl) {

this.jdbcUrl = jdbcUrl;

}



public String getUsername() {

return user;

}



public void setUsername(String user) {

this.user = user;

}



public String getPassword() {

return password;

}



public void setPassword(String password) {

this.password = password;

}



public String getDriverClass() {

return driverClass;

}



public void setDriverClass(String driverClass) {

this.driverClass = driverClass;

}



public int getTimeout() {

return timeout;

}



public void setTimeout(int timeout) {

this.timeout = timeout * 1000;

}



public void setMaxActive(int maxActive) {

this.maxActive = maxActive;

}



public int getMaxActive() {

return maxActive;

}

}



/**

* 数据连接的自封装 ,是java.sql.Connection的一个钩子,主要是处理close方法

*

* @author eric

*

*/

class _Connection implements InvocationHandler {

private final static String CLOSE_METHOD_NAME = "close";



private final Connection conn;

private final JavaGGDataSource ds;



_Connection(Connection conn, JavaGGDataSource ds) {

this.conn = conn;

this.ds = ds;

}



@Override

public Object invoke(Object proxy, Method method, Object[] args)

throws Throwable {

Object obj = null;

// 判断是否调用了close的方法,如果调用close方法则把连接置为无用状态

if (CLOSE_METHOD_NAME.equals(method.getName())) {

// 归还连接给连接池

ds.offerConnection(this);

} else {

// 运行非close的方法

obj = method.invoke(conn, args);

}

return obj;

}



public Connection getConnection() {

// 返回数据库连接conn的接管类,以便截住close方法

Connection conn2 = (Connection) Proxy.newProxyInstance(conn.getClass().getClassLoader(), new Class[] { Connection.class }, this);

return conn2;

}



public void close() throws SQLException {

// 调用真正的close方法,一但调用此方法就直接关闭连接

if(conn!=null&&!conn.isClosed())

conn.close();

}



}





_Connection类是一个私有类,主要实现一个对Connection动态代理的功能(有点象windows的钩子)

说白了就是实现调用connection.close方法时我们映射到另一个方法上.

呵呵,是不是好简单呢,代码没有多复杂。

这里有一个问题要说明一吓:如果设置的maxActive值小于当前线程总数,那么当并发非常大时会出现资源争夺情况,一吓子cpu就会提高不小,所以建议设为无限制,或大于线程总数的值。



接下来我们测试测试

开五十个线程,对同一个表进行select/insert 10000次操作,每次select/insert一条记录

代码如下:

public static void main(String[] args) {

JavaGGDataSource ds = new JavaGGDataSource();

ds.setDriverClass("com.mysql.jdbc.Driver");

ds.setJdbcUrl("jdbc:mysql://192.168.1.6:3306/test");

ds.setPassword("ps");

ds.setUsername("name");

ds.setTimeout(300);

// ds.setMaxActive(60);

for (int i = 0; i < 50; i++) {

new GG(ds).start();

}

}

class GG extends Thread {

JavaGGDataSource ds = null;

long l = System.currentTimeMillis();



public GG(JavaGGDataSource ds) {

this.ds = ds;

}

static String sql = "insert into testgg(col1,cols) values (?,?)";

static String selectsql = "select * from testgg where id=?";



public void run() {

for (int t = 0; t < 10000; t++) {

Connection conn = null;

try {

conn = ds.getConnection();

PreparedStatement ps = conn.prepareStatement(sql);

//以下为insert

ps.setInt(1, 133664);

ps.setString(2, "ddd");

ps.executeUpdate();

//以下为select

ResultSet rs=ps.executeQuery();

while(rs.next()){

rs.getInt("id");

rs.getInt("col1");

}

rs.close();

ps.close();

} catch (SQLException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} finally {

try {

if (conn != null) {

// ds.offerConnection(conn);

conn.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

System.out.println(System.currentTimeMillis() - l);

}



测试结果

50个线程select 10000*50次结果

Javaggds 406à2156毫秒 连接池有50个连接(和线程数一样)

C3p0 1235à1657毫秒 连接池有500个连接(和设置的最大连接数一样 )

50个线程insert 10000*50次结果

Javaggds 39125à52734 连接池有50个连接(和线程数一样)

C3p0 60000à65141毫秒 连接池有500个连接(和设置的最大连接数一样 )



测试分析:

c3p0是使用同锁或同步块对连接池进行同步的,所以它的时间会控制在一定范围之内

但带来的问题是线程竞争和线程等待。

Javaggds是使用了concurrent包中的无等待算法队列,这个同步是在cpu层面上做的,所以同步的块非常小,大家有兴趣可以看看CAS同步算法。



Hibernate结合

编辑hibernate 加入/修改配置为

<property name="connection.provider_class">

com.javagg.datasource.DataSourceConnectionProvider<property>

<property name="db.driverClass">com.mysql.jdbc.Driver<property>

<property name="db.jdbcUrl">jdbc:mysql://192.168.1.6:3306/test<property>

<property name="db.username">name<property>

<property name="db.password">password<property>



<property name="db.datasource">

com.javagg.datasource.JavaGGDataSource<property>

<property name="db.maxActive">-1<property>< 无限制连接数 >

<property name="db.timeout">3600<property>< 一小时timeout 单位为秒 >



DataSourceConnectonProvider代码如下:



import java.lang.reflect.Method;

import java.sql.Connection;

import java.sql.SQLException;

import java.util.Iterator;

import java.util.Properties;



import javax.sql.DataSource;



import org.apache.commons.beanutils.BeanUtils;

import org.hibernate.HibernateException;

import org.hibernate.connection.ConnectionProvider;



public class DataSourceConnectionProvider implements ConnectionProvider {



private final static String BASE_KEY = "db.";

private final static String DATASOURCE_KEY = "db.datasource";



protected DataSource dataSource;



/*

* (non-Javadoc)

*

* @see

* org.hibernate.connection.ConnectionProvider#configure(java.util.Properties

* )

*/

public void configure(Properties props) throws HibernateException {

initDataSource(props);

}



/*

* (non-Javadoc)

*

* @see org.hibernate.connection.ConnectionProvider#getConnection()

*/

public Connection getConnection() throws SQLException {

return dataSource.getConnection();

}



/*

* (non-Javadoc)

*

* @see

* org.hibernate.connection.ConnectionProvider#closeConnection(java.sql.

* Connection)

*/

public void closeConnection(Connection conn) throws SQLException {

if (conn != null)

conn.close();

}



/*

* (non-Javadoc)

*

* @see org.hibernate.connection.ConnectionProvider#close()

*/

public void close() throws HibernateException {

if (dataSource != null)

try {

Method mClose = dataSource.getClass().getMethod("close");

mClose.invoke(dataSource);

} catch (Exception e) {

throw new HibernateException(e);

}

dataSource = null;

}



/*

* (non-Javadoc)

*

* @see

* org.hibernate.connection.ConnectionProvider#supportsAggressiveRelease()

*/

public boolean supportsAggressiveRelease() {

return false;

}



/**

* Initialize the datasource

*

* @param props

* @throws HibernateException

*/

protected void initDataSource(Properties props) throws HibernateException {

String dataSourceClass = null;

Properties new_props = new Properties();

Iterator keys = props.keySet().iterator();

while (keys.hasNext()) {

String key = (String) keys.next();

if (key.equals(DATASOURCE_KEY)) {

dataSourceClass=props.getProperty(key);

} else if (key.startsWith(BASE_KEY)) {

String value = props.getProperty(key);

new_props.setProperty(key.substring(BASE_KEY.length()), value);

}

}

if (dataSourceClass == null)

throw new HibernateException("Property 'db.datasource' no defined.");

try {

dataSource = (DataSource) Class.forName(dataSourceClass).newInstance();

BeanUtils.populate(dataSource, new_props);

} catch (Exception e) {

throw new HibernateException(e);

}

}



}



接下来我们测试配置有没有成功

代码如下:

public static void main(String args[]) {

Configuration cfg = new Configuration();

cfg.configure();

SessionFactory sf = cfg.buildSessionFactory();

for (int i = 0; i < 100; i++) {

Session sess = sf.openSession();



TestGGBean pc = new TestGGBean();

pc.setCol1(1111);

pc.setCols("ddaaaa");

sess.save(pc);

sess.flush();

sess.close();

}

} 阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: