您的位置:首页 > 运维架构

HDFS下断点续传的实现——下载

2017-06-02 15:59 148 查看
在使用HDFS系统的服务器端提供给客户端断点下载功能时,可以使用Hadoop api提供的seek方法读取偏移量,从用户所请求的偏移量断点处开始读取文件流传输给用户。

实现的方式,同样使用偏移量的方式约定断点续传,但与上传不同的是本次由客户端维护offset参数,需要断点下载时将包括了offset字段的文件信息提交给服务端,服务端根据偏移量字段使用seek方法读取断点后的文件流传输给用户。

HDFSHandler方法中,断点下载的实现代码如下:

/**
* 断点续传下载文件到本地
*
* @param response
* @param offSet
*            偏移量
* @param encryptfilename
*            HDFS中的加密文件名
* @throws IOException
*/
public static void downloadFromHDFSinOffset(HttpServletResponse response, long offSet, String encryptfilename)
throws IOException {
if (response == null || encryptfilename == null || encryptfilename.equals(""))
return;

response.setContentType("application/x-msdownload");
response.addHeader("Content-Disposition", "attachment;filename=" + encryptfilename);
ServletOutputStream sos = response.getOutputStream();

DownloadInOffset dfb = null;
try {
dfb = new DownloadInOffset(encryptfilename);
byte[] buffer = new byte[1024];
long size = dfb.getFileSize(encryptfilename);// 文件总大小
System.out.println("HDFSHandler : getFileSize = " + size);
int len = 0;// 每次读取字节长度
long length = 0;// 已读取总长度
if (offSet == 0) {
len = dfb.download(buffer);// 将指针指向文件起始处
} else {
len = dfb.download(buffer, offSet);// 先将指针指向偏移量位置
}
do {
// 开始循环,往buffer中写入输出流
sos.write(buffer, 0, len);
length += len;
} while ((len = dfb.download(buffer)) != -1 && length + offSet <= size);

System.out.println("HDFSHandler : offset = " + offSet);
System.out.println("HDFSHandler : length = " + length);
System.out.println("HDFSHandler : offset + length = " + offSet + "+" + length + "=" + (offSet + length));

sos.flush();
} catch (Exception e) {
Log.logException(e);
} finally {
dfb.close();
}
}


其中,实例化了自定义的DownloadInOffset类,该类中使用了如上所说的seek方法将指针指向了偏移量位置,代码如下:

public class DownloadInOffset {
private FileSystem hadoopFS = null;
private Configuration conf = null;
private FSDataInputStream fsInputStream = null;
FileSystem hdfs = null;

private FileInputStream fileInputStream = null;

public DownloadInOffset(String srcPath) throws IOException {
// srcPath = HDFSHandler.hdfs_path + HDFSHandler.user_path + "/" +
// srcPath;
srcPath = HDFSHandler.hdfs_path + HDFSHandler.download_path + "/" + srcPath;
// srcPath = "/home/hadoop/backup/aaa.rmvb";
conf = HDFSHandler.conf;
hadoopFS = FileSystem.get(conf);
fsInputStream = hadoopFS.open(new Path(srcPath));
}

public int download(byte[] ioBuffer, long offset) throws IOException {
if (ioBuffer == null) {
IOException e = new IOException("ioBuffer is null");
throw e;
}
// fsInputStream.read(offset, ioBuffer, 0, length);
fsInputStream.seek(offset);
return fsInputStream.read(ioBuffer);
}

public int download(byte[] ioBuffer) throws IOException {
if (ioBuffer == null) {
IOException e = new IOException("ioBuffer is null");
throw e;
}
return fsInputStream.read(ioBuffer);
}

public void close() {
if (fsInputStream != null) {
try {
fsInputStream.close();
hdfs.close();
} catch (IOException e) {
e.printStackTrace();
Log.logException(e);
}
}
}

public long getFileSize(String srcPath) throws IOException {
srcPath = HDFSHandler.hdfs_path + HDFSHandler.download_path + "/" + srcPath;
// srcPath = "/home/hadoop/backup/aaa.rmvb";
conf = HDFSHandler.conf;
hdfs = FileSystem.get(URI.create(srcPath), conf);
FileStatus fs = hdfs.getFileStatus(new Path(srcPath));
long size = fs.getLen();
// hdfs.close();
return size;
}
}


应注意的是,seek方法本身开销较大,若服务端需要考虑到大量用户同时请求该操作,可以手动搭配服务端负载均衡,或使用生产者消费者方法将过多的seek请求放置到阻塞队列。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐