OSS实现多文件多线程的断点上传(java)
2015-05-31 15:29
513 查看
OSS实现多文件多线程的断点上传(java)
本文地址:/article/2083008.html前面写了关于OSS实现多文件多线程的断点下载的文章,今天写下使用OSS-SDK实现多文件多线程的断点上传,实现方式和断点下载使用的mainPool和pool嵌套线程池的方式相同,父线程池管理文件个数,子线程池pool每个文件的上传线程数。
uploadMainPool 代码片段:
[code] public static ExecutorService uploadMainPool = null; static{ uploadMainPool = Executors.newFixedThreadPool(Constant.CONCURRENT_FILE_NUMBER,new ThreadFactory() { public Thread newThread(Runnable r) { Thread s = Executors.defaultThreadFactory().newThread(r); s.setDaemon(true); return s; } }); }
在static中实例化为固定大小的线程池,由于默认的ThreadFactory创建的线程为非守护状态,为了避免java程序不能退出的问题,保证在文件上传完成后当前java程序结束在jvm中的运行,需要重写ThreadFactory,使其创建的线程为守护线程。
Constant类中自定义了程序中需要使用到的变量,可在类中直接定义或读取配置文件,Constant.CONCURRENT_FILE_NUMBER定义了并发文件数。
uploadMainPool 中的每个线程负责一个文件,每个线程上传文件时创建子线程池,由子线程池分块上传文件;
使用OSS-SDK分块上传就必须使用到其提供的MultipartUpload上传方式,具体参见oss的sdk,由于oss支持在云端记录上传事件上传的碎片(就是一个上传操作中的块),我们可以使用这一点完成断点上传,当然这里的断点并不能做到记录到每个上传块上传停止的位置,是取决于OSS-SDK的限制,但是上传中能记录到已经上传的块就已经足够了
这里仍然使用定时序列化子线程池线程对象的方式,定时将包含了上传块信息的线程序列化到文件,在再次上传同一文件时反序列化,直接丢到子线程池上传即可。下面是uploadMainPool 中的线程OSSUploadFile代码清单
OSSUploadFile代码:
[code]package cloudStorage.oss; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import cloudStorage.basis.Constant; import cloudStorage.basis.Global; import cloudStorage.basis.OSSClientFactory; import cloudStorage.oss.upload.UploadPartObj; import cloudStorage.oss.upload.UploadPartThread; import cloudStorage.util.MD5Util; import cloudStorage.util.ObjectSerializableUtil; import com.aliyun.oss.ClientException; import com.aliyun.oss.OSSClient; import com.aliyun.oss.OSSErrorCode; import com.aliyun.oss.OSSException; import com.aliyun.oss.model.CannedAccessControlList; import com.aliyun.oss.model.CompleteMultipartUploadRequest; import com.aliyun.oss.model.InitiateMultipartUploadRequest; import com.aliyun.oss.model.InitiateMultipartUploadResult; import com.aliyun.oss.model.ObjectMetadata; import com.aliyun.oss.model.PartETag; import com.aliyun.oss.model.PutObjectResult; /** * @Description: 使用普通方式上传小文件,使用Multipart上传方式进行多线程分段上传较大文件 * @author: zrk * @time: 2015年3月30日 上午10:45:12 */ public class OSSUploadFile implements Callable<Integer>{ public static final Logger LOGGER = Logger.getLogger(OSSUploadFile.class); //外层线程池 public static ExecutorService uploadMainPool = null; static{ uploadMainPool = Executors.newFixedThreadPool(Constant.CONCURRENT_FILE_NUMBER,new ThreadFactory() { public Thread newThread(Runnable r) { Thread s = Executors.defaultThreadFactory().newThread(r); s.setDaemon(true); return s; } }); } //内层线程池 private ExecutorService pool ; private String sourcePath;//上次路径 private String bucketName;//bucketName private String key;//云端存储路径 /** * oss上传 支持断点续传 * @param sourcePath 源文件路径 * @param bucketName bucketName * @param key 存储key -在oss的存储路径 */ public OSSUploadFile(String sourcePath,String bucketName,String key) { //实例化单文件上次线程池 pool = Executors.newFixedThreadPool(Constant.SINGLE_FILE_CONCURRENT_THREADS); this.sourcePath = sourcePath; this.bucketName = bucketName; this.key = key; } /** * 执行当前线程 * @return */ @SuppressWarnings("finally") public Integer uploadFile() { Integer r = Global.ERROR; //向uploadMainPool中submit当前线程 Future<Integer> result = uploadMainPool.submit(this); try { r=result.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally{ return r; } } /** * oss上传 * @param sourcePath 源文件路径 * @param bucketName bucketName * @param key 存储key 存储路径 * @return Integer */ @Override public Integer call(){ OSSClient client = OSSClientFactory.getInstance(); File uploadFile = new File(sourcePath); if (!uploadFile.exists()){ LOGGER.info("==无法找到文件:" + sourcePath); return Global.FILE_NOT_FOUND_ERROR; } int result = Global.ERROR; key = key.contains("\\\\")?key.replaceAll("\\\\", "/"):key.contains("\\")?key.replaceAll("\\", "/"):key; // 准备Bucket result = ensureBucket(client,bucketName); if(result == Global.ERROR )return result; // 使用multipart的方式上传文件 result = uploadBigFile(client, bucketName, key, uploadFile); pool = null; return result; } // 通过Multipart的方式上传一个大文件 private int uploadBigFile(OSSClient client, String bucketName, String key, File uploadFile) { //自定义的每个上传分块大小 Integer partSize = Constant.UPLOAD_PART_SIZE; //需要上传的文件分块数 int partCount = calPartCount(uploadFile,partSize); //文件的MD5值 String fileDM5Str = ""; String uploadId = ""; //序列化的文件路径(与上传文件同路径使用.up.temp后缀) String serializationFilePath = sourcePath+".up.temp"; boolean isSerializationFile = false; //子线程池的线程对象封装类(用于序列化的) UploadPartObj uploadPartObj = null; //获取文件MD5值 fileDM5Str = MD5Util.md5Hex(uploadFile); //若存在上传失败留下的序列化文件则反序列化对象 if(new File(serializationFilePath).exists()){ uploadPartObj = (UploadPartObj)ObjectSerializableUtil.load(serializationFilePath); isSerializationFile = true; } //序列化文件不存在,分配分块给子线程池线程对象 if(uploadPartObj==null||!isSerializationFile){ uploadPartObj = new UploadPartObj(); try { //初始化MultipartUpload 返回uploadId uploadId = initMultipartUpload(client, bucketName, key,fileDM5Str); } catch (OSSException|ClientException e) { e.printStackTrace(); LOGGER.error("=="+e.getMessage()); return Global.OSS_SUBMIT_ERROR; } for (int i = 0; i < partCount ; i++) { long start = partSize * i; long curPartSize = partSize < uploadFile.length() - start ? partSize : uploadFile.length() - start; //构造上传线程,UploadPartThread是执行每个分块上传任务的线程 uploadPartObj.getUploadPartThreads().add(new UploadPartThread(client, bucketName, key,uploadFile, uploadId, i + 1,partSize * i, curPartSize)); } } try { int i = 0; //upload方法提交分块上传线程至子线程池上传,while循环用于上传失败重复上传,Constant.RETRY定义重复次数 while (upload(uploadPartObj,serializationFilePath).isResult()==false) { if(++i == Constant.RETRY)break; } } catch (Exception e) { e.printStackTrace(); LOGGER.info("==" + e.getMessage()); return Global.THREAD_ERROR; } if(!uploadPartObj.isResult()){ return Global.NETWORK_ERROR; } try { //完成一个multi-part请求。 completeMultipartUpload(client, bucketName, key, uploadPartObj); } catch (Exception e) { e.printStackTrace(); LOGGER.info("==" + e.getMessage()); ObjectSerializableUtil.save(uploadPartObj,serializationFilePath); return Global.OSS_SUBMIT_ERROR; } return Global.SUCCESS; } /** * 多线程上传单个文件 * @param uploadPartObj * @param serializationFilePath * @return */ private UploadPartObj upload(UploadPartObj uploadPartObj,String serializationFilePath){ try { uploadPartObj.setResult(true); //向子线程池中submit单个文件所有分块上传线程 for (int i=0;i<uploadPartObj.getUploadPartThreads().size();i++) { if(uploadPartObj.getUploadPartThreads().get(i).getMyPartETag()==null) pool.submit(uploadPartObj.getUploadPartThreads().get(i)); } //shutdown子线程池,池内所上传任务执行结束后停止当前线程池 pool.shutdown(); while (!pool.isTerminated()) { //循环检查线程池,同时在此序列化uploadPartObj ObjectSerializableUtil.save(uploadPartObj,serializationFilePath); pool.awaitTermination(Constant.SERIALIZATION_TIME, TimeUnit.SECONDS); } //判断上传结果 for (UploadPartThread uploadPartThread: uploadPartObj.getUploadPartThreads()) { if(uploadPartThread.getMyPartETag()==null) uploadPartObj.setResult(false); } //上传成功 删除序列化文件 if (uploadPartObj.isResult()==true) ObjectSerializableUtil.delSerlzFile(serializationFilePath); } catch (Exception e) { LOGGER.info("=="+e.getMessage()); } return uploadPartObj; } // 根据文件的大小和每个Part的大小计算需要划分的Part个数。 private static int calPartCount(File f,Integer partSize) { int partCount = (int) (f.length() / partSize); if (f.length() % partSize != 0){ partCount++; } return partCount; } // 创建Bucket private int ensureBucket(OSSClient client, String bucketName){ try { // 创建bucket client.createBucket(bucketName); //设置bucket的访问权限,public-read-write权限 client.setBucketAcl(bucketName, CannedAccessControlList.PublicRead); } catch (OSSException e) { e.printStackTrace(); LOGGER.info("==" + e.getMessage()); return Global.ERROR; } catch (ClientException e) { if (!OSSErrorCode.BUCKET_ALREADY_EXISTS.equals(e.getErrorCode())) { // 如果Bucket已经存在,则忽略 LOGGER.info("==bucketName已经存在"); }else{ e.printStackTrace(); LOGGER.info("=="+e.getMessage()); return Global.ERROR; } } return Global.SUCCESS; } // 初始化一个Multi-part upload请求。 private static String initMultipartUpload(OSSClient client,String bucketName, String key,String fileDM5Str) throws OSSException,ClientException{ ObjectMetadata objectMetadata =new ObjectMetadata(); objectMetadata.getUserMetadata().put(Global.X_OSS_META_MY_MD5,fileDM5Str); InitiateMultipartUploadRequest initUploadRequest = new InitiateMultipartUploadRequest(bucketName, key, objectMetadata); InitiateMultipartUploadResult initResult = client.initiateMultipartUpload(initUploadRequest); String uploadId = initResult.getUploadId(); return uploadId; } // 完成一个multi-part请求。 private static void completeMultipartUpload(OSSClient client, String bucketName, String key,UploadPartObj uploadPartObj){ List<PartETag> eTags = new ArrayList<PartETag>(); for (UploadPartThread uploadPartThread : uploadPartObj.getUploadPartThreads()) { eTags.add(new PartETag(uploadPartThread.getMyPartETag().getPartNumber(),uploadPartThread.getMyPartETag().geteTag())); } //为part按partnumber排序 Collections.sort(eTags, new Comparator<PartETag>(){ public int compare(PartETag arg0, PartETag arg1) { PartETag part1= arg0; PartETag part2= arg1; return part1.getPartNumber() - part2.getPartNumber(); } }); CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(bucketName, key, uploadPartObj.getUploadPartThreads().get(0).getUploadId(), eTags); client.completeMultipartUpload(completeMultipartUploadRequest); } }
此处引用了多个类
cloudStorage.basis.Constant;//定义程序中使用的变量
cloudStorage.basis.Global;//定义了全局的静态值,错误状态值
cloudStorage.basis.OSSClientFactory;//OSSClient工厂
cloudStorage.oss.upload.UploadPartObj;//分块上传线程类封装
cloudStorage.oss.upload.UploadPartThread;//分块上传线程
cloudStorage.util.ObjectSerializableUtil;//序列工具类
cloudStorage.util.MD5Util;//MD5工具类
调上传方法也是阻塞式,需要给调用者返回结果,所以使用Callable和Future返回int型状态值,下面是子线程池pool中的分块上传线程UploadPartThread的代码清单
UploadPartThread 代码:
[code]package cloudStorage.oss.upload; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.io.Serializable; import java.util.Date; import java.util.concurrent.Callable; import org.apache.log4j.Logger; import cloudStorage.basis.OSSClientFactory; import cloudStorage.service.InDateService; import com.aliyun.oss.OSSClient; import com.aliyun.oss.model.UploadPartRequest; import com.aliyun.oss.model.UploadPartResult; /** * @Description: 上传每个part的线程类 可序列化 用于上传的断点续传 * @author: zrk * @time: 2015年4月1日 上午10:35:34 */ public class UploadPartThread implements Callable<UploadPartThread> ,Serializable { private static final long serialVersionUID = 1L; public static final Logger LOGGER = Logger.getLogger(UploadPartThread.class); private File uploadFile; private String bucket; private String object; private long start; private long size; private int partId; private String uploadId; private MyPartETag myPartETag; public UploadPartThread(OSSClient client,String bucket, String object, File uploadFile,String uploadId, int partId, long start, long partSize) { this.uploadFile = uploadFile; this.bucket = bucket; this.object = object; this.start = start; this.size = partSize; this.partId = partId; this.uploadId = uploadId; } @Override public UploadPartThread call() { InputStream in = null; try { in = new FileInputStream(uploadFile); in.skip(start); UploadPartRequest uploadPartRequest = new UploadPartRequest(); uploadPartRequest.setBucketName(bucket); uploadPartRequest.setKey(object); uploadPartRequest.setUploadId(uploadId); uploadPartRequest.setInputStream(in); uploadPartRequest.setPartSize(size); uploadPartRequest.setPartNumber(partId); //MyPartETag是对uploadPartResult.getPartETag()的返回值PartETag的封装,主要是为了能序列化PartETag,MyPartETag仅比PartETag多实现了Serializable接口 myPartETag = new MyPartETag(uploadPartResult.getPartETag()); } catch (Exception e) { e.printStackTrace(); LOGGER.error("=="+e.getMessage()); } finally { if (in != null){ try { in.close(); } catch (Exception e) { LOGGER.error("==关闭读入流失败:"+e.getMessage()); } } return this; } } public String getUploadId() { return uploadId; } public void setUploadId(String uploadId) { this.uploadId = uploadId; } public MyPartETag getMyPartETag() { return myPartETag; } public void setMyPartETag(MyPartETag myPartETag) { this.myPartETag = myPartETag; } }
每个UploadPartThread 上传线程执行结束都会将自己作为返回值返回,当前文件是否上传完整有每个线程的返回值决定。MyPartETag是对uploadPartResult.getPartETag()的返回值PartETag的封装,主要是为了能序列化PartETag中的值,MyPartETag仅比PartETag多实现了Serializable接口
ObjectSerializableUtil 代码 点击查看
UploadPartObj 代码:
[code]package cloudStorage.oss.upload; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * 单个文件的上传线程集合 * @Description: TODO * @author: zrk * @time: 2015年5月6日 下午1:56:54 */ public class UploadPartObj implements Serializable{ private static final long serialVersionUID = 1L; List<UploadPartThread> uploadPartThreads = Collections.synchronizedList(new ArrayList<UploadPartThread>()); boolean result = true; public List<UploadPartThread> getUploadPartThreads() { return uploadPartThreads; } public void setUploadPartThreads(List<UploadPartThread> uploadPartThreads) { this.uploadPartThreads = uploadPartThreads; } public boolean isResult() { return result; } public void setResult(boolean result) { this.result = result; } }
所有上传当前文件的线程都会操作uploadPartThreads ,所以uploadPartThreads 使用集合Collections.synchronizedList将其转换为一个线程安全的类。UploadPartObj 封装了UploadPartThreads和一个用于标识上传成功与失败的boolean值,定时保存序列化文件就直接序列化UploadPartObj ,UploadPartObj 的线程安全是至关重要的。
MyPartETag 代码:
[code]package cloudStorage.oss.upload; import java.io.Serializable; import com.aliyun.oss.model.PartETag; /** * @Description: 封装PartETag 用于序列化 * @author: zrk * @time: 2015年4月1日 上午10:52:50 */ public class MyPartETag implements Serializable { private static final long serialVersionUID = 1L; private int partNumber; private String eTag; public MyPartETag(PartETag partETag ) { super(); this.partNumber = partETag.getPartNumber(); this.eTag = partETag.getETag(); } public int getPartNumber() { return partNumber; } public void setPartNumber(int partNumber) { this.partNumber = partNumber; } public String geteTag() { return eTag; } public void seteTag(String eTag) { this.eTag = eTag; } }
OSSClientFactory 代码:
[code]package cloudStorage.basis; import com.aliyun.oss.OSSClient; public class OSSClientFactory { private static OSSClient ossClient = null; private OSSClientFactory() { } public static OSSClient getInstance() { if (ossClient == null) { // 可以使用ClientConfiguration对象设置代理服务器、最大重试次数等参数。 // ClientConfiguration config = new ClientConfiguration(); ossClient = new OSSClient(Constant.OSS_ENDPOINT,Constant.ACCESS_ID, Constant.ACCESS_KEY); } return ossClient; } }
Constant 代码:
[code]package cloudStorage.basis; import cloudStorage.service.OSSConfigService; /** * @Description: * @author: zrk * @time: 2015年4月1日 下午5:22:28 */ public class Constant { public static String OSS_ENDPOINT = "http://oss.aliyuncs.com/"; public static String ACCESS_ID; public static String ACCESS_KEY; public static Integer DOWNLOAD_PART_SIZE ; // 每个上传Part的大小 public static Integer UPLOAD_PART_SIZE ; // 每个上传Part的大小 public static int CONCURRENT_FILE_NUMBER ; // 并发文件数。 public static int SINGLE_FILE_CONCURRENT_THREADS ; // 单文件并发线程数。 public static int RETRY ;//失败重试次数 public static int SERIALIZATION_TIME;//断点保存时间间隔(秒) //。。。 }
Constant中数值是加载外部配置文件,也可在这儿直接配置,个别参数值断点下载时使用,只看上传的话请忽略。
Global代码:
[code]package cloudStorage.basis; /** * @Description: TODO * @author: zrk * @time: 2015年4月1日 下午5:22:46 */ public class Global { public static final int SUCCESS = 1; public static final int ERROR = 10; public static final int FILE_NOT_FOUND_ERROR = 11; public static final int THREAD_ERROR = 12; public static final int NETWORK_ERROR = 13; public static final int OSS_SUBMIT_ERROR = 14; // META public static final String X_OSS_META_MY_MD5 = "x-oss-meta-my-md5"; }
调用上传方式:
实例化OSSUploadFile后调用uploadFile方法:
return new OSSUploadFile(sourcePath,bucketName, key).uploadFile();
多线程上传供大家参考,后面自己在使用过程中继续优化。基于OSS-SDK的分块断点下载请参考OSS实现多文件多线程的断点下载(java) 。
本文地址:/article/2083008.html
相关文章推荐
- 解决JAVA字符串长度与数据库字符串长度不一致问题
- 疯狂Java学习笔记(62)------------线程初识
- Java基本语法学习时需要注意的几点
- java集合排序
- Java实现二叉排序(查找)树的操作
- java学习之集合框架工具类
- 一款工具助你减少java代码bug
- Java反射基础(二)— Class类
- java io操作整理
- java复习(五)
- Dijkstra算法java现实
- JAVA 文档注释,类的说明,HTML说明文档的生成
- 设置MyEclipse的代码自动提示功能
- 个人认为讲解java异常最好的文章
- SpringMVC介绍之Validation
- Spring2.5学习2.3_如何注入基本类型
- Eclipse Luna安装Activiti Designer报错解决方案
- java线程中run和start方法的区别
- java文件读取全了解
- Vijava 学习笔记(指定虚拟机添加虚拟磁盘修订功能版本)