您的位置:首页 > 运维架构 > Apache

基于apache commons pool实现自动以ftp连接池

2016-07-28 15:45 519 查看
  由于前段时间公司有批量上传文件的需求,功能实现了,但是发现性能低下,原因在于上传到ftp时需要频繁的链接和断开ftp,造成了很大的网络开销,于是决定自己写一个ftp连接池,用于文件上传,简单的查了下apache的api,发现只要实现ObjectPool和PoolableObjectFactory<T>接口便可实现一个简单的自定义连接池。

//实现ObjectPool

package com.pool;

import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.ObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 自定义实现ftp连接池
* @author PYY
*
*/
public class FTPClientPool implements ObjectPool<FTPClient> {

private static Logger logger = LoggerFactory.getLogger(FTPClient.class);

private static final int DEFAULT_POOL_SIZE = 10;

private BlockingQueue<FTPClient> pool;

private FTPClientFactory factory;

public FTPClientPool(FTPClientFactory factory) throws Exception {
this(DEFAULT_POOL_SIZE, factory);
}

public FTPClientPool(int poolSize, FTPClientFactory factory) throws Exception {
this.factory = factory;
this.pool = new ArrayBlockingQueue<FTPClient>(poolSize * 2);
initPool(poolSize);
}

/**
* 初始化连接池
* @param maxPoolSize
* 					最大连接数
* @throws Exception
*/
private void initPool(int maxPoolSize) throws Exception {
int count = 0;
while(count < maxPoolSize) {
this.addObject();
count++;
}
}

/**
* 从连接池中获取对象
*/
@Override
public FTPClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
FTPClient client = pool.take();
if(client == null) {
client = factory.makeObject();
addObject();
} else if(!factory.validateObject(client)) {
invalidateObject(client);
client = factory.makeObject();
addObject();
}

return client;
}

/**
* 返还一个对象(链接)
*/
@Override
public void returnObject(FTPClient client) throws Exception {
if ((client != null) && !pool.offer(client,3,TimeUnit.SECONDS)) {
try {
factory.destroyObject(client);
} catch (Exception e) {
throw e;
}
}
}

/**
* 移除无效的对象(FTP客户端)
*/
@Override
public void invalidateObject(FTPClient client) throws Exception {
pool.remove(client);
}

/**
* 增加一个新的链接,超时失效
*/
@Override
public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
pool.offer(factory.makeObject(), 3, TimeUnit.SECONDS);
}

/**
* 获取空闲链接数(这里暂不实现)
*/
@Override
public int getNumIdle() {
return 0;
}

/**
* 获取正在被使用的链接数
*/
@Override
public int getNumActive() {
return 0;
}

@Override
public void clear() throws Exception, UnsupportedOperationException {

}

/**
* 关闭连接池
*/
@Override
public void close() {
try {
while(pool.iterator().hasNext()) {
FTPClient client = pool.take();
factory.destroyObject(client);
}
} catch(Exception e) {
logger.error("close ftp client pool failed...{}", e);
}

}

}


// PoolableObjectFactory

package com.pool;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.pool.PoolableObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 连接池工厂类
* @author PYY
*
*/
public class FTPClientFactory implements PoolableObjectFactory<FTPClient> {

private static Logger logger = LoggerFactory.getLogger(FTPClientFactory.class);

private FTPClientConfig config;

public FTPClientFactory(FTPClientConfig config) {
this.config = config;
}

@Override
public FTPClient makeObject() throws Exception {
FTPClient ftpClient = new FTPClient();
ftpClient.setConnectTimeout(config.getClientTimeout());
try {
ftpClient.connect(config.getHost(), config.getPort());
int reply = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftpClient.disconnect();
logger.warn("FTPServer refused connection");
return null;
}
boolean result = ftpClient.login(config.getUsername(), config.getPassword());
if (!result) {
logger.warn("ftpClient login failed... username is {}", config.getUsername());
}
ftpClient.setFileType(config.getTransferFileType());
ftpClient.setBufferSize(1024);
ftpClient.setControlEncoding(config.getEncoding());
if (config.getPassiveMode().equals("true")) {
ftpClient.enterLocalPassiveMode();
}
} catch (Exception e) {
logger.error("create ftp connection failed...{}", e);
throw e;
}

return ftpClient;
}

@Override
public void destroyObject(FTPClient ftpClient) throws Exception {
try {
if(ftpClient != null && ftpClient.isConnected()) {
ftpClient.logout();
}
} catch (Exception e) {
logger.error("ftp client logout failed...{}", e);
throw e;
} finally {
if(ftpClient != null) {
ftpClient.disconnect();
}
}

}

@Override
public boolean validateObject(FTPClient ftpClient) {
try {
return ftpClient.sendNoOp();
} catch (Exception e) {
logger.error("Failed to validate client: {}", e);
}
return false;
}

@Override
public void activateObject(FTPClient obj) throws Exception {
//Do nothing

}

@Override
public void passivateObject(FTPClient obj) throws Exception {
//Do nothing

}

}


初始化一个ftp链接时要多链接进行设置配置,为方便管理,需要封装一个配置类,代码如下

package com.pool;

public class FTPClientConfig {

private String host;

private int port;

private String username;

private String password;

private String passiveMode;

private String encoding;

private int clientTimeout;

private int bufferSize;

private int transferFileType;

private boolean renameUploaded;

private int retryTime;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getPassiveMode() {
return passiveMode;
}

public void setPassiveMode(String passiveMode) {
this.passiveMode = passiveMode;
}

public String getEncoding() {
return encoding;
}

public void setEncoding(String encoding) {
this.encoding = encoding;
}

public int getClientTimeout() {
return clientTimeout;
}

public void setClientTimeout(int clientTimeout) {
this.clientTimeout = clientTimeout;
}

public int getBufferSize() {
return bufferSize;
}

public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}

public int getTransferFileType() {
return transferFileType;
}

public void setTransferFileType(int transferFileType) {
this.transferFileType = transferFileType;
}

public boolean isRenameUploaded() {
return renameUploaded;
}

public void setRenameUploaded(boolean renameUploaded) {
this.renameUploaded = renameUploaded;
}

public int getRetryTime() {
return retryTime;
}

public void setRetryTime(int retryTime) {
this.retryTime = retryTime;
}

}


测试代码

package com.pool;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;

import org.apache.commons.net.ftp.FTPClient;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FTPClientTest {

private FTPClient ftpClient;

private Logger log = LoggerFactory.getLogger(this.getClass());

@Test
public void test() throws Exception {
FTPClientConfig config = new FTPClientConfig();
config.setHost("192.168.1.1");
config.setPort(21);
config.setUsername("test");
config.setPassword("test");
config.setEncoding("utf-8");
config.setPassiveMode("false");
config.setClientTimeout(30 * 1000);

FTPClientFactory factory = new FTPClientFactory(config);
FTPClientPool pool = new FTPClientPool(factory);
ftpClient = pool.borrowObject();
uploadFile(new File("d:/detail/123.txt"), "/upload/files/");
}

/***
* 上传Ftp文件
* @param localFile 当地文件
* @param romotUpLoadePath上传服务器路径 - 应该以/结束
* */
public boolean uploadFile(File localFile, String romotUpLoadePath) {
BufferedInputStream inStream = null;
boolean success = false;
try {
this.ftpClient.changeWorkingDirectory(romotUpLoadePath);// 改变工作路径
inStream = new BufferedInputStream(new FileInputStream(localFile));
log.info(localFile.getName() + "开始上传.....");
success = this.ftpClient.storeFile(localFile.getName(), inStream);
if (success == true) {
log.info(localFile.getName() + "上传成功");
return success;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
log.error(localFile + "未找到");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (inStream != null) {
try {
inStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return success;
}
}


至此,一个简单的连接池就完成了,接下来可以根据自己的需要完善该连接池,如空闲连接的容器,正在使用中的链接保存的容器实现等
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java apache ftp 连接池