您的位置:首页 > 移动开发

HDFS下断点续传的实现——上传

2017-06-02 12:22 148 查看
本文提供了使用HDFS的服务端在用户请求上传文件时保证断点续传的方法,实现思想为记录偏移量 + Hadoop提供的内容追加api——append方法。

1.Hbase中建file_offset表用于记录用户未完成上传文件的偏移量,表行键row由文件的MD5 + 用户名唯一确定;

2.客户端请求上传文件时,服务端首先根据MD5码在file_offset表中查找该文件,若不存在则为首次上传,若已存在则为断点续传,应从表中读取到偏移量字段传给客户端,客户端从服务端拿到的偏移量往后开始传输文件流;

从Hadoop提供的Api文档中可知,HDFS中在已有文件后追加内容可以使用append方法,不过此前需要在./hadoop/etc/hadoop下的hdfs-site.xml文件中配置如下项:

<property>
<name>dfs.support.append</name>
<value>true</value>
</property>

实现从客户端上传文件的断点续传代码如下:

/**
* 使用偏移量方法将文件上传至HDFS
*
* @param encryptfilename
*            加密文件名
* @param request
*            http请求,从request中读取文件数据
* @return
* @throws Exception
*/
public static long upload2HDFSinOffset(String encryptfilename, HttpServletRequest request) throws Exception {

if (encryptfilename == null || encryptfilename.equals(""))
return 0;

FileSystem hadoopFS = null;
Configuration conf = null;
long length = 0;

String despath = HDFSHandler.hdfs_path + HDFSHandler.user_path + "/" + encryptfilename;
conf = HDFSHandler.conf;

Log.logger.debug("create files in hdfs");
try {

if (FileLogic.getAUsersoffset(request) <= 0) {
hadoopFS = FileSystem.get(conf);
FSDataOutputStream fsOutputStream = null;
// 偏移量为0,首次上传,create方法;

if (!hadoopFS.exists(new Path(despath))) {
fsOutputStream = hadoopFS.create(new Path(despath));
hadoopFS.close();
hadoopFS = FileSystem.get(conf);
} else {
fsOutputStream = hadoopFS.create(new Path(despath));
}

ServletInputStream fos = request.getInputStream();
byte[] buffer = new byte[1024];
int len = 0;

while ((len = fos.read(buffer)) != -1) {
fsOutputStream.write(buffer, 0, len);
length += len;
}
fsOutputStream.flush();
fsOutputStream.close();
fos.close();
hadoopFS.close();
System.out.println("HDFSHandler if return :" + length);
return length;
} else {
//偏移量非0,文件续传,append方法
hadoopFS = FileSystem.get(conf);if (!hadoopFS.exists(new Path(despath))) {hadoopFS.create(new Path(despath));hadoopFS.close();hadoopFS = FileSystem.get(conf);}FSDataOutputStream fsOutputStream2 =
null;// fsOutputStream2 = hadoopFS.append(new Path(despath2));fsOutputStream2 = hadoopFS.append(new Path(despath));ServletInputStream fos2 = request.getInputStream();byte[] buffer = new byte[1024];int len = 0;while ((len = fos2.read(buffer)) != -1) {fsOutputStream2.write(buffer,
0, len);length += len;}fsOutputStream2.flush();fsOutputStream2.close();fos2.close();hadoopFS.close();return length;}} catch (Exception e) {// 用户中断上传,传回已接收到的文件长度(记录在偏移量表中,以待用户断线续传时传给用户)return length;}}


注:Hbase的文件(file)表中也可以由MD5码作为表行键或某项字段,由此可判断系统中是否已存在该文件,以针对不同用户上传同一文件时可达到秒传大文件
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息