您的位置:首页 > 其它

Fasdfs 客户端 有没有提供资源池的必要

2017-06-28 16:47 155 查看
StorageClient 的
do_upload_file
方法

/**
* upload file to storage server
* @param cmd the command code
* @param group_name the group name to upload file to, can be empty
* @param master_filename the master file name to generate the slave file
* @param prefix_name the prefix name to generate the slave file
* @param file_ext_name file ext name, do not include dot(.)
* @param file_size the file size
* @param callback the write data callback object
*   @param meta_list meta info array
* @return  2 elements string array if success:<br>
*          <ul><li> results[0]: the group name to store the file</li></ul>
*          <ul><li> results[1]: the new created filename</li></ul>
*         return null if fail
*/
protected String[] do_upload_file(byte cmd, String group_name, String master_filename,
String prefix_name, String file_ext_name, long file_size, UploadCallback callback,
NameValuePair[] meta_list) throws IOException, MyException
{
byte[] header;
byte[] ext_name_bs;
String new_group_name;
String remote_filename;
boolean bNewConnection;
Socket storageSocket;
byte[] sizeBytes;
byte[] hexLenBytes;
byte[] masterFilenameBytes;
boolean bUploadSlave;
int offset;
long body_len;

bUploadSlave = ((group_name != null && group_name.length() > 0) &&
(master_filename != null && master_filename.length() > 0) &&
(prefix_name != null));
if (bUploadSlave)
{
bNewConnection = this.newUpdatableStorageConnection(group_name, master_filename);
}
else
{
bNewConnection = this.newWritableStorageConnection(group_name);
}

try
{
storageSocket = this.storageServer.getSocket();

ext_name_bs = new byte[ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN];
Arrays.fill(ext_name_bs, (byte)0);
if (file_ext_name != null && file_ext_name.length() > 0)
{
byte[] bs = file_ext_name.getBytes(ClientGlobal.g_charset);
int ext_name_len = bs.length;
if (ext_name_len > ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN)
{
ext_name_len = ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN;
}
System.arraycopy(bs, 0, ext_name_bs, 0, ext_name_len);
}

if (bUploadSlave)
{
masterFilenameBytes = master_filename.getBytes(ClientGlobal.g_charset);

sizeBytes = new byte[2 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE];
body_len = sizeBytes.length + ProtoCommon.FDFS_FILE_PREFIX_MAX_LEN + ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN
+ masterFilenameBytes.length + file_size;

hexLenBytes = ProtoCommon.long2buff(master_filename.length());
System.arraycopy(hexLenBytes, 0, sizeBytes, 0, hexLenBytes.length);
offset = hexLenBytes.length;
}
else
{
masterFilenameBytes = null;
sizeBytes = new byte[1 + 1 * ProtoCommon.FDFS_PROTO_PKG_LEN_SIZE];
body_len = sizeBytes.length + ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN + file_size;

sizeBytes[0] = (byte)this.storageServer.getStorePathIndex();
offset = 1;
}

hexLenBytes = ProtoCommon.long2buff(file_size);
System.arraycopy(hexLenBytes, 0, sizeBytes, offset, hexLenBytes.length);

OutputStream out = storageSocket.getOutputStream();
header = ProtoCommon.packHeader(cmd, body_len, (byte)0);
byte[] wholePkg = new byte[(int)(header.length + body_len - file_size)];
System.arraycopy(header, 0, wholePkg, 0, header.length);
System.arraycopy(sizeBytes, 0, wholePkg, header.length, sizeBytes.length);
offset = header.length + sizeBytes.length;
if (bUploadSlave)
{
byte[] prefix_name_bs = new byte[ProtoCommon.FDFS_FILE_PREFIX_MAX_LEN];
byte[] bs = prefix_name.getBytes(ClientGlobal.g_charset);
int prefix_name_len = bs.length;
Arrays.fill(prefix_name_bs, (byte)0);
if (prefix_name_len > ProtoCommon.FDFS_FILE_PREFIX_MAX_LEN)
{
prefix_name_len = ProtoCommon.FDFS_FILE_PREFIX_MAX_LEN;
}
if (prefix_name_len > 0)
{
System.arraycopy(bs, 0, prefix_name_bs, 0, prefix_name_len);
}

System.arraycopy(prefix_name_bs, 0, wholePkg, offset, prefix_name_bs.length);
offset += prefix_name_bs.length;
}

System.arraycopy(ext_name_bs, 0, wholePkg, offset, ext_name_bs.length);
offset += ext_name_bs.length;

if (bUploadSlave)
{
System.arraycopy(masterFilenameBytes, 0, wholePkg, offset, masterFilenameBytes.length);
offset += masterFilenameBytes.length;
}

out.write(wholePkg);

if ((this.errno=(byte)callback.send(out)) != 0)
{
return null;
}

ProtoCommon.RecvPackageInfo pkgInfo = ProtoCommon.recvPackage(storageSocket.getInputStream(),
ProtoCommon.STORAGE_PROTO_CMD_RESP, -1);
this.errno = pkgInfo.errno;
if (pkgInfo.errno != 0)
{
return null;
}

if (pkgInfo.body.length <= ProtoCommon.FDFS_GROUP_NAME_MAX_LEN)
{
throw new MyException("body length: " + pkgInfo.body.length + " <= " + ProtoCommon.FDFS_GROUP_NAME_MAX_LEN);
}

new_group_name = new String(pkgInfo.body, 0, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN).trim();
remote_filename = new String(pkgInfo.body, ProtoCommon.FDFS_GROUP_NAME_MAX_LEN, pkgInfo.body.length - ProtoCommon.FDFS_GROUP_NAME_MAX_LEN);
String[] results = new String[2];
results[0] = new_group_name;
results[1] = remote_filename;

if (meta_list == null || meta_list.length == 0)
{
return results;
}

int result = 0;
try
{
result = this.set_metadata(new_group_name, remote_filename,
meta_list, ProtoCommon.STORAGE_SET_METADATA_FLAG_OVERWRITE);
}
catch(IOException ex)
{
result = 5;
throw ex;
}
finally
{
if (result != 0)
{
this.errno = (byte)result;
this.delete_file(new_group_name, remote_filename);
return null;
}
}

return results;
}
catch(IOException ex)
{
if (!bNewConnection)
{
try
{
this.storageServer.close();
}
catch(IOException ex1)
{
ex1.printStackTrace();
}
finally
{
this.storageServer = null;
}
}

throw ex;
}
finally
{
if (bNewConnection)
{
try
{
this.storageServer.close();
}
catch(IOException ex1)
{
ex1.printStackTrace();
}
finally
{
this.storageServer = null;
}
}
}
}


每次上传时都首先获取一个连接,上传完毕后 自动关闭连接。这里的连接指的是Socket连接。

将TrackerClient、TrackerServer、StoragerClient、StoragerServer 做成池的话,也只是对象池,而非资源池,因为真正的资源Socket连接 在Socket客户端已经被关闭,在Socket服务器端可能被缓存。

Socket客户端创建过程。

StorageServer 的 构造方法

/**
- Constructor
- @param ip_addr the ip address of storage server
- @param port the port of sto
abb4
rage server
- @param store_path the store path index on the storage server
*/
public StorageServer(String ip_addr, int port, byte store_path) throws IOException
{
super(ClientGlobal.getSocket(ip_addr, port), new InetSocketAddress(ip_addr, port));
if (store_path < 0)
{
this.store_path_index = 256 + store_path;
}
else
{
this.store_path_index = store_path;
}
}


TrackerServer 的
getSocket


/**
* get the connected socket
* @return the socket
*/
public Socket getSocket() throws IOException
{
if (this.sock == null)
{
this.sock = ClientGlobal.getSocket(this.inetSockAddr);
}

return this.sock;
}


ClientGlobal 的
getSocket


/**
* construct Socket object
* @param addr InetSocketAddress object, including ip address and port
* @return connected Socket object
*/
public static Socket getSocket(InetSocketAddress addr) throws IOException
{
Socket sock = new Socket();
sock.setSoTimeout(ClientGlobal.g_network_timeout);
sock.connect(addr, ClientGlobal.g_connect_timeout);
return sock;
}


fastdfs 在客户端没有采用Socket池,而是每次都在客户端创建一个新的Socket连接,而在Socket服务器端也就是图片服务器端连接过来的Socket是被缓存起来的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: