基于apache commons pool实现自动以ftp连接池
2016-07-28 15:45
519 查看
由于前段时间公司有批量上传文件的需求,功能实现了,但是发现性能低下,原因在于上传到ftp时需要频繁的链接和断开ftp,造成了很大的网络开销,于是决定自己写一个ftp连接池,用于文件上传,简单的查了下apache的api,发现只要实现ObjectPool和PoolableObjectFactory<T>接口便可实现一个简单的自定义连接池。
//实现ObjectPool
// PoolableObjectFactory
初始化一个ftp链接时要多链接进行设置配置,为方便管理,需要封装一个配置类,代码如下
测试代码
至此,一个简单的连接池就完成了,接下来可以根据自己的需要完善该连接池,如空闲连接的容器,正在使用中的链接保存的容器实现等
//实现ObjectPool
package com.pool; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.pool2.ObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 自定义实现ftp连接池 * @author PYY * */ public class FTPClientPool implements ObjectPool<FTPClient> { private static Logger logger = LoggerFactory.getLogger(FTPClient.class); private static final int DEFAULT_POOL_SIZE = 10; private BlockingQueue<FTPClient> pool; private FTPClientFactory factory; public FTPClientPool(FTPClientFactory factory) throws Exception { this(DEFAULT_POOL_SIZE, factory); } public FTPClientPool(int poolSize, FTPClientFactory factory) throws Exception { this.factory = factory; this.pool = new ArrayBlockingQueue<FTPClient>(poolSize * 2); initPool(poolSize); } /** * 初始化连接池 * @param maxPoolSize * 最大连接数 * @throws Exception */ private void initPool(int maxPoolSize) throws Exception { int count = 0; while(count < maxPoolSize) { this.addObject(); count++; } } /** * 从连接池中获取对象 */ @Override public FTPClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException { FTPClient client = pool.take(); if(client == null) { client = factory.makeObject(); addObject(); } else if(!factory.validateObject(client)) { invalidateObject(client); client = factory.makeObject(); addObject(); } return client; } /** * 返还一个对象(链接) */ @Override public void returnObject(FTPClient client) throws Exception { if ((client != null) && !pool.offer(client,3,TimeUnit.SECONDS)) { try { factory.destroyObject(client); } catch (Exception e) { throw e; } } } /** * 移除无效的对象(FTP客户端) */ @Override public void invalidateObject(FTPClient client) throws Exception { pool.remove(client); } /** * 增加一个新的链接,超时失效 */ @Override public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException { pool.offer(factory.makeObject(), 3, TimeUnit.SECONDS); } /** * 获取空闲链接数(这里暂不实现) */ @Override public int getNumIdle() { return 0; } /** * 获取正在被使用的链接数 */ @Override public int getNumActive() { return 0; } @Override public void clear() throws Exception, UnsupportedOperationException { } /** * 关闭连接池 */ @Override public void close() { try { while(pool.iterator().hasNext()) { FTPClient client = pool.take(); factory.destroyObject(client); } } catch(Exception e) { logger.error("close ftp client pool failed...{}", e); } } }
// PoolableObjectFactory
package com.pool; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.pool.PoolableObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 连接池工厂类 * @author PYY * */ public class FTPClientFactory implements PoolableObjectFactory<FTPClient> { private static Logger logger = LoggerFactory.getLogger(FTPClientFactory.class); private FTPClientConfig config; public FTPClientFactory(FTPClientConfig config) { this.config = config; } @Override public FTPClient makeObject() throws Exception { FTPClient ftpClient = new FTPClient(); ftpClient.setConnectTimeout(config.getClientTimeout()); try { ftpClient.connect(config.getHost(), config.getPort()); int reply = ftpClient.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftpClient.disconnect(); logger.warn("FTPServer refused connection"); return null; } boolean result = ftpClient.login(config.getUsername(), config.getPassword()); if (!result) { logger.warn("ftpClient login failed... username is {}", config.getUsername()); } ftpClient.setFileType(config.getTransferFileType()); ftpClient.setBufferSize(1024); ftpClient.setControlEncoding(config.getEncoding()); if (config.getPassiveMode().equals("true")) { ftpClient.enterLocalPassiveMode(); } } catch (Exception e) { logger.error("create ftp connection failed...{}", e); throw e; } return ftpClient; } @Override public void destroyObject(FTPClient ftpClient) throws Exception { try { if(ftpClient != null && ftpClient.isConnected()) { ftpClient.logout(); } } catch (Exception e) { logger.error("ftp client logout failed...{}", e); throw e; } finally { if(ftpClient != null) { ftpClient.disconnect(); } } } @Override public boolean validateObject(FTPClient ftpClient) { try { return ftpClient.sendNoOp(); } catch (Exception e) { logger.error("Failed to validate client: {}", e); } return false; } @Override public void activateObject(FTPClient obj) throws Exception { //Do nothing } @Override public void passivateObject(FTPClient obj) throws Exception { //Do nothing } }
初始化一个ftp链接时要多链接进行设置配置,为方便管理,需要封装一个配置类,代码如下
package com.pool; public class FTPClientConfig { private String host; private int port; private String username; private String password; private String passiveMode; private String encoding; private int clientTimeout; private int bufferSize; private int transferFileType; private boolean renameUploaded; private int retryTime; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getPassiveMode() { return passiveMode; } public void setPassiveMode(String passiveMode) { this.passiveMode = passiveMode; } public String getEncoding() { return encoding; } public void setEncoding(String encoding) { this.encoding = encoding; } public int getClientTimeout() { return clientTimeout; } public void setClientTimeout(int clientTimeout) { this.clientTimeout = clientTimeout; } public int getBufferSize() { return bufferSize; } public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; } public int getTransferFileType() { return transferFileType; } public void setTransferFileType(int transferFileType) { this.transferFileType = transferFileType; } public boolean isRenameUploaded() { return renameUploaded; } public void setRenameUploaded(boolean renameUploaded) { this.renameUploaded = renameUploaded; } public int getRetryTime() { return retryTime; } public void setRetryTime(int retryTime) { this.retryTime = retryTime; } }
测试代码
package com.pool; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import org.apache.commons.net.ftp.FTPClient; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FTPClientTest { private FTPClient ftpClient; private Logger log = LoggerFactory.getLogger(this.getClass()); @Test public void test() throws Exception { FTPClientConfig config = new FTPClientConfig(); config.setHost("192.168.1.1"); config.setPort(21); config.setUsername("test"); config.setPassword("test"); config.setEncoding("utf-8"); config.setPassiveMode("false"); config.setClientTimeout(30 * 1000); FTPClientFactory factory = new FTPClientFactory(config); FTPClientPool pool = new FTPClientPool(factory); ftpClient = pool.borrowObject(); uploadFile(new File("d:/detail/123.txt"), "/upload/files/"); } /*** * 上传Ftp文件 * @param localFile 当地文件 * @param romotUpLoadePath上传服务器路径 - 应该以/结束 * */ public boolean uploadFile(File localFile, String romotUpLoadePath) { BufferedInputStream inStream = null; boolean success = false; try { this.ftpClient.changeWorkingDirectory(romotUpLoadePath);// 改变工作路径 inStream = new BufferedInputStream(new FileInputStream(localFile)); log.info(localFile.getName() + "开始上传....."); success = this.ftpClient.storeFile(localFile.getName(), inStream); if (success == true) { log.info(localFile.getName() + "上传成功"); return success; } } catch (FileNotFoundException e) { e.printStackTrace(); log.error(localFile + "未找到"); } catch (IOException e) { e.printStackTrace(); } finally { if (inStream != null) { try { inStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return success; } }
至此,一个简单的连接池就完成了,接下来可以根据自己的需要完善该连接池,如空闲连接的容器,正在使用中的链接保存的容器实现等
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序