您的位置:首页 > 编程语言 > Java开发

JAVA并发- 典型连接池的实现

2016-08-11 14:11 309 查看
package com.xyz.connpool;

public interface IConnection {
/**
* 关闭当前连接
*/
public void close();
/**
* 销毁当前连接
*/
public void destroy();
//应该具备的其他方法
}


public class Connection implements IConnection {
private String name;
private IConnPool connPool;
public Connection(IConnPool connPool,String name){
this.connPool=connPool;
this.name=name;
}
@Override
public void close() {
this.connPool.releaseConn(this);
}
@Override
public void destroy() {
System.out.println("destroy connection---"+name);
}

@Override
public String toString() {
// TODO Auto-generated method stub
super.toString();
return name;
}

}


public interface IConnPool {
/**
* 获得一个可用连接,超过最大连接数时线程等待,直到有有连接释放时返回一个可用连接或者超时返回null
* @param maxWaitTime
* @return
*/
public IConnection getConn();
/**
* 将释放的空闲连接加入空闲连接池
* @param conn
*/
public void releaseConn(IConnection conn);
/**
* 销毁连接池
*/
public void destroy();
/**
* 获取当前线程对应的连接
* @return
*/
public IConnection getCurrentConn();
}


import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class ConnPool implements IConnPool {
private int initNum;
private int maxNum;
private int hasAlready;
private long maxWaitTime;
private final List<IConnection> freeConnList;
private final List<IConnection> activeConnList;
private static ThreadLocal<IConnection> threadLocalVar=new ThreadLocal<IConnection>(){
protected IConnection initialValue() {
return null;
};
};
public ConnPool(){
this.initNum=3;
this.maxNum=5;
this.hasAlready=0;
this.maxWaitTime=1000;
this.freeConnList=new ArrayList<IConnection>(maxNum);
this.activeConnList=new ArrayList<IConnection>(maxNum);
init();
}
public ConnPool(int initNum,int maxNum,long maxWaitTime){
this.initNum=initNum;
this.maxNum=maxNum;
this.hasAlready=0;
this.maxWaitTime=maxWaitTime;
this.freeConnList=new ArrayList<IConnection>(maxNum);
this.activeConnList=new ArrayList<IConnection>(maxNum);
init();
}
private void init(){
for(int i=0;i<this.initNum;i++){
synchronized (this) {
this.freeConnList.add(new Connection(this,UUID.randomUUID().toString()));
hasAlready++;
}
}
}
@Override
public synchronized IConnection getConn()  {
IConnection conn=null;
if(!this.freeConnList.isEmpty()){
conn=this.freeConnList.remove(0);
if(conn!=null)
threadLocalVar.set(conn);//为线程绑定连接
this.activeConnList.add(conn);
return conn;
}
if(hasAlready<maxNum){
conn = new Connection(this,UUID.randomUUID().toString());
hasAlready++;
if(conn!=null)
threadLocalVar.set(conn);//为线程绑定连接
this.activeConnList.add(conn);
return conn;
}
try {
this.wait(maxWaitTime);
if(!this.freeConnList.isEmpty()){
conn=this.freeConnList.remove(0);
if(conn!=null)
threadLocalVar.set(conn);//为线程绑定连接
this.activeConnList.add(conn);
return conn;
}
return null;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
@Override
public synchronized void releaseConn(IConnection conn) {
if(this.activeConnList.contains(conn)){
this.freeConnList.add(conn);
this.activeConnList.remove(conn);
threadLocalVar.remove();
this.notify();
}
}
@Override
public synchronized void destroy() {
IConnection conn=null;
int temp=this.freeConnList.size();
for(int i=0;i<temp;i++){
conn=this.freeConnList.remove(0);
conn.destroy();
}
temp=this.activeConnList.size();
for(int i=0;i<temp;i++){
conn=this.activeConnList.remove(0);
conn.destroy();
}
conn=null;
this.hasAlready=0;
}
@Override
public IConnection getCurrentConn() {
return threadLocalVar.get();
}

}


package com.xyz.connpool;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ConnPoolManager {
private int initConnNum;
private int maxConnNum;
private long maxWaitTime;
// 连接池存放
public Map<Integer,IConnPool> pools=new ConcurrentHashMap<Integer, IConnPool>();
//	public Hashtable<Integer,IConnPool> pools = new Hashtable<Integer, IConnPool>();

public ConnPoolManager(int initConnNum,int maxConnNum, long maxWaitTime){
this.initConnNum=initConnNum;
this.maxConnNum=maxConnNum;
this.maxWaitTime=maxWaitTime;
init();
}

// 初始化所有的连接池
private void init(){
for(int hostId =0;hostId<3;hostId++){
ConnPool connPool=new ConnPool(this.initConnNum,this.maxConnNum, this.maxWaitTime);
if(connPool != null){
pools.put(hostId,connPool);
System.out.println("Info:Init connPool successed for hostId ->" +hostId);
}
}
}

/**
* 根据主机id获取连接池
* @param hostId
* @return
*/
public IConnPool getPool(int hostId){
IConnPool pool = null;
if(pools.containsKey(hostId)){
pool = pools.get(hostId);
}
return pool;
}

/**
* 获取一个到主机hostId的连接
* @param hostId
* @return
*/
public IConnection getConn(int hostId){
IConnPool connPool = getPool(hostId);
if(connPool==null){
System.out.println("Error:Can't find this connecion pool for hostId->"+hostId);
return null;
}
return connPool.getConn();
}

/**
* 关闭指定主机的某个链接
* @param hostId
* @param conn
*/
public void close(int hostId,IConnection conn){
IConnPool pool = getPool(hostId);
if(pool != null)
pool.releaseConn(conn);
}
/**
* 注销某个主机的连接池
* @param poolName
*/
public void destroy(int hostId){
IConnPool pool = getPool(hostId);
if(pool != null){
pool.destroy();
}
pools.remove(pool);
}
/**
* 注销所有连接池
*/
public void destory(){
for(int hostId:pools.keySet()){
this.destroy(hostId);
}
}
}


package com.xyz.connpool;
import java.util.concurrent.CountDownLatch;

public class Demo {
public final static int REQUESTNUM=200;
public final static int DURATION=20;

public static void main(String [] args) throws Exception{
final CountDownLatch startGate=new CountDownLatch(1);
final CountDownLatch endGate=new CountDownLatch(REQUESTNUM);

final ConnPoolManager cpm=new ConnPoolManager(5, 10, 1000);
final IConnection conns[] =new IConnection[REQUESTNUM];
for(int i=0;i<REQUESTNUM;i++){
final int flag=i;
new Thread(new Runnable() {
@Override
public void run() {
try {
startGate.await();
} catch (InterruptedException e1) {}

conns[flag]=cpm.getConn(0);
System.out.println(conns[flag]);
try {
Thread.sleep(DURATION);
} catch (InterruptedException e) {}
cpm.close(0, conns[flag]);

endGate.countDown();
}
}).start();
}

long begin = System.currentTimeMillis();
startGate.countDown();
endGate.await();
long over = System.currentTimeMillis();
System.out.println(over-begin);
//	    Thread.sleep(5000);
cpm.destory();
}
}


源代码下载https://github.com/ZhenShiErGe/ConnectionPool.git
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: