您的位置:首页 > 编程语言 > Java开发

Fastdfs-javaapi-连接池

2016-01-03 11:30 405 查看
1 简绍

      大家都知道fastdfs分为tracker server和storage server, tracker server是跟踪服务器,主要做调度工作,在访问上起负载均衡的作用 。storage server是存储服务器,主要负责文件的存储。

  我们使用java api在分布式文件系统的文件去上传、修改、删除等操作时,有以下几步:

申请与tracker server的连接
TrackerClient trackerClient = new TrackerClient();

TrackerServer trackerServer= trackerClient.getConnection();

通过trackerServer得到与storage server的连接的客户端
StorageServer ss = tc.getStoreStorage(ts);

StorageClient1 client1 = new StorageClient1(trackerServer, ss);

上传文件
client1.upload_file1(fileBuff, fileExtName, null);

关闭连接
ss.close();

trackerServer.close();

最终可以完成我们想要的操作,但是这两次创建的连接都是tcp/ip连接,如果每次创建完连接我们都再关闭连接。这个过程是很耗时的。

     通过api申请的连接并不支持高发发(即一个storage连接只能上传一个文件),我们必须保证同一时刻一个连接上传一个文件。

由于我们用fastdfs做文件服务器,通过web的管理平台将文件上传至分布式文件系统中,每次申请完连接再关闭连接,对我们来说会延长上传文件的时间,而且上传文件高并发下,申请的连接可能突然增至几百个,这样我们的服务器的性能损耗太大了。

 

2 java api 源代码分析

通过查看fastdfs java api的源代码了解到。通过trackerServer和storageServer得到的new StorageClient1(trackerServer, storageServer);client对象,在操作文件时,会自动检查trackerServer和storageServer是否为空,如果为空,程序会自动为任一server创建连接,待操作完成后,将创建的连接关闭。

如果storageServer为null,则程序自动创建trackerServer,根据trackerServer得到storageServer,并返回storageServer,在返回storageServer之前会关闭trackerServer。通过storageServer上传完文件之后,关闭storageServer.

如果都不空null,则api中不会关闭连接。

 

3 结合实际。

      由于我们搭建的文件系统架构是,一个tracker和三个storage.

 

 

 

 

 

 

 

 

 

 

从这个图形可以看出,我们的tracker连接是没有必要改变的,申请一次就可以,(直接点)故我们可以为tracker创建连接池。

连接池代码如下ConnectionPool:

 

 

import java.io.IOException;

import java.net.InetSocketAddress;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.TimeUnit;

 

import org.csource.fastdfs.ClientGlobal;

import org.csource.fastdfs.StorageClient1;

import org.csource.fastdfs.StorageServer;

import org.csource.fastdfs.TrackerClient;

import org.csource.fastdfs.TrackerGroup;

import org.csource.fastdfs.TrackerServer;

 

public class ConnectionPool {

 

 // the limit of connection instance

private int size = 5;

// busy connection instances

private ConcurrentHashMap<StorageClient1, Object> busyConnectionPool = null; 

// idle  connection instances

private ArrayBlockingQueue<StorageClient1> idleConnectionPool = null; 

 

private final static String tgStr = "192.168.4.117";

private final static int port = 22122;

 

private Object obj = new Object();

 

// class method

// singleton

private ConnectionPool() {

busyConnectionPool = new ConcurrentHashMap<StorageClient1, Object>();

idleConnectionPool = new ArrayBlocking
216cc
Queue<StorageClient1>(size);

init(size);

 

};

 

private static ConnectionPool instance = new ConnectionPool();

 

// get the connection pool instance

public static ConnectionPool getPoolInstance() {

return instance;

}

 

// class method

// init the connection pool

private void init(int size) {

initClientGlobal();

TrackerServer trackerServer = null;

try {

TrackerClient trackerClient = new TrackerClient();

//Only tracker

trackerServer = trackerClient.getConnection();

for (int i = 0; i < size; i++) {

StorageServer storageServer = null;

StorageClient1 client1 = new StorageClient1(trackerServer,

storageServer);

idleConnectionPool.add(client1);

}

} catch (IOException e) {

e.printStackTrace();

}finally{

if(trackerServer!=null){

try {

trackerServer.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

 

 

// 1. pop one connection from the idleConnectionPool,

// 2. push the connection into busyConnectionPool;

// 3. return the connection

// 4. if no idle connection, do wait for wait_time seconds, and check again

public StorageClient1 checkout(int waitTimes) throws InterruptedException {

StorageClient1 client1 = idleConnectionPool.poll(waitTimes,

TimeUnit.SECONDS);

busyConnectionPool.put(client1, obj);

return client1;

}

 

// 1. pop the connection from busyConnectionPool;

// 2. push the connection into idleConnectionPool;

// 3. do nessary cleanup works.

public void checkin(StorageClient1 client1) {

if (busyConnectionPool.remove(client1)!=null) {

idleConnectionPool.add(client1);

}

}

 

 

// so if the connection was broken due to some erros (like

// : socket init failure, network broken etc), drop this connection

// from the busyConnectionPool, and init one new connection.

public void drop(StorageClient1 client1) {

if (busyConnectionPool.remove(client1)!=null) {

TrackerServer trackerServer = null;

try {

TrackerClient trackerClient = new TrackerClient();

//TODO 此处有内存泄露,因为trackerServer没有关闭连接

trackerServer = trackerClient.getConnection();

StorageServer storageServer = null;

StorageClient1 newClient1 = new StorageClient1(trackerServer, storageServer);

idleConnectionPool.add(newClient1);

} catch (IOException e) {

e.printStackTrace();

}finally{

if(trackerServer!=null){

try {

trackerServer.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

}

 

private void initClientGlobal() {

InetSocketAddress[] trackerServers = new InetSocketAddress[1];

trackerServers[0] = new InetSocketAddress(tgStr, port);

ClientGlobal.setG_tracker_group(new TrackerGroup(trackerServers));

// 连接超时的时限,单位为毫秒

ClientGlobal.setG_connect_timeout(2000);

// 网络超时的时限,单位为毫秒

ClientGlobal.setG_network_timeout(30000);

ClientGlobal.setG_anti_steal_token(false);

// 字符集

ClientGlobal.setG_charset("UTF-8");

ClientGlobal.setG_secret_key(null);

}

 

}

 

 

 

 

 

 

 

上传文件类接口:ImageServer

 

import java.io.File;

import java.io.IOException;

 

/**

 * 图片文件上传

 * @author zhanghua

 *

 */

public interface ImageServer {

/**

 * 上传文件

 * @param file 文件

 * @return 文件存储路径

 * @throws IOException

 * @throws Exception

 */

public String uploadFile(File file) throws IOException, Exception ;

/**

 * 上传文件

 * @param file  文件

 * @param name  文件名称

 * @return  文件存储路径

 * @throws IOException

 * @throws Exception

 */

public String uploadFile(File file, String name) throws IOException, Exception ;

/**

 * 上传文件

 * @param fileBuff 二进制数组

 * @param name

 * @return

 */ 

public String uploadFile(byte[] fileBuff,String name) throws IOException, Exception;

 

}

 

 

 

 

上传文件实现:ImageServerImpl

 

 

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.IOException;

import java.net.InetSocketAddress;

 

import org.csource.fastdfs.ClientGlobal;

import org.csource.fastdfs.StorageClient1;

import org.csource.fastdfs.TrackerGroup;

 

import com.imageserver.pool.ConnectionPool;

 

public class ImageServerImpl implements ImageServer {

 

private final static String tgStr = "192.168.4.117";

private final static int port = 22122;

 

public ImageServerImpl() {

init();

}

 

public String uploadFile(File file) throws IOException, Exception {

return uploadFile(file, file.getName());

}

 

public String uploadFile(File file, String name) throws IOException,

Exception {

byte[] fileBuff = getFileBuffer(file);

String fileExtName = getFileExtName(name);

return send(fileBuff, fileExtName);

}

 

public String uploadFile(byte[] fileBuff, String name) throws IOException,

Exception {

String fileExtName = getFileExtName(name);

return send(fileBuff, fileExtName);

}

 

private String send(byte[] fileBuff, String fileExtName)

throws IOException, Exception {

String upPath = null;

StorageClient1 client1 = null;

try {

client1 = ConnectionPool.getPoolInstance().checkout(10);

upPath = client1.upload_file1(fileBuff, fileExtName, null);

ConnectionPool.getPoolInstance().checkin(client1);

} catch (InterruptedException e) {

//确实没有空闲连接,并不需要删除与fastdfs连接

throw e;

} catch (Exception e) {

//发生io异常等其它异常,默认删除这次连接重新申请

ConnectionPool.getPoolInstance().drop(client1);

e.printStackTrace();

throw e;

}

return upPath;

}

 

private String getFileExtName(String name) {

String extName = null;

if (name != null && name.contains(".")) {

extName = name.substring(name.lastIndexOf(".") + 1);

}

return extName;

}

 

private byte[] getFileBuffer(File file) {

byte[] fileByte = null;

try {

FileInputStream fis = new FileInputStream(file);

fileByte = new byte[fis.available()];

fis.read(fileByte);

} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

return fileByte;

}

 

private void init() {

InetSocketAddress[] trackerServers = new InetSocketAddress[1];

trackerServers[0] = new InetSocketAddress(tgStr, port);

ClientGlobal.setG_tracker_group(new TrackerGroup(trackerServers));

// 连接超时的时限,单位为毫秒

ClientGlobal.setG_connect_timeout(2000);

// 网络超时的时限,单位为毫秒

ClientGlobal.setG_network_timeout(30000);

ClientGlobal.setG_anti_steal_token(false);

// 字符集

ClientGlobal.setG_charset("UTF-8");

ClientGlobal.setG_secret_key(null);

}

}

 

 

 

 

上传文件测试类:ImageServerTest

 

 

import java.io.File;

import java.io.IOException;

import java.util.concurrent.atomic.AtomicInteger;

 

public class ImageServerTest {

 

ImageServer s = new ImageServerImpl();

AtomicInteger count = new AtomicInteger();

 

static long stime = 0;

 

public static void main(String[] args) {

stime = System.currentTimeMillis();

new ImageServerTest().mutiUP();

}

 

public void statTime() {

if (count.getAndIncrement() == 9) {

System.out.println("用时:" + (System.currentTimeMillis() - stime));

}

}

 

public void mutiUP() {

for (int i = 0; i < 1; i++) {

new Thread() {

@Override

public void run() {

try {

for (int j = 0; j < 10; j++) {

String p = s.uploadFile(new File("F:\\pic\\1.jpg"));

System.out.println(p);

}

statTime();

} catch (IOException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}

}.start();

}

}

}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  fastdfs 连接池 java