您的位置:首页 > 编程语言 > Java开发

OSS实现多文件多线程的断点上传(java)

2015-05-31 15:29 513 查看

OSS实现多文件多线程的断点上传(java)

本文地址:/article/2083008.html

前面写了关于OSS实现多文件多线程的断点下载的文章,今天写下使用OSS-SDK实现多文件多线程的断点上传,实现方式和断点下载使用的mainPool和pool嵌套线程池的方式相同,父线程池管理文件个数,子线程池pool每个文件的上传线程数。

uploadMainPool 代码片段:

[code]    public static ExecutorService uploadMainPool = null;
    static{
        uploadMainPool = Executors.newFixedThreadPool(Constant.CONCURRENT_FILE_NUMBER,new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread s = Executors.defaultThreadFactory().newThread(r);
                s.setDaemon(true);
                return s;
            }
        });
    }


在static中实例化为固定大小的线程池,由于默认的ThreadFactory创建的线程为非守护状态,为了避免java程序不能退出的问题,保证在文件上传完成后当前java程序结束在jvm中的运行,需要重写ThreadFactory,使其创建的线程为守护线程。


Constant类中自定义了程序中需要使用到的变量,可在类中直接定义或读取配置文件,Constant.CONCURRENT_FILE_NUMBER定义了并发文件数。


uploadMainPool 中的每个线程负责一个文件,每个线程上传文件时创建子线程池,由子线程池分块上传文件;

使用OSS-SDK分块上传就必须使用到其提供的MultipartUpload上传方式,具体参见oss的sdk,由于oss支持在云端记录上传事件上传的碎片(就是一个上传操作中的块),我们可以使用这一点完成断点上传,当然这里的断点并不能做到记录到每个上传块上传停止的位置,是取决于OSS-SDK的限制,但是上传中能记录到已经上传的块就已经足够了

这里仍然使用定时序列化子线程池线程对象的方式,定时将包含了上传块信息的线程序列化到文件,在再次上传同一文件时反序列化,直接丢到子线程池上传即可。下面是uploadMainPool 中的线程OSSUploadFile代码清单

OSSUploadFile代码:

[code]package cloudStorage.oss;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import cloudStorage.basis.Constant;
import cloudStorage.basis.Global;
import cloudStorage.basis.OSSClientFactory;
import cloudStorage.oss.upload.UploadPartObj;
import cloudStorage.oss.upload.UploadPartThread;
import cloudStorage.util.MD5Util;
import cloudStorage.util.ObjectSerializableUtil;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSErrorCode;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.CannedAccessControlList;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.PutObjectResult;

/**
 * @Description: 使用普通方式上传小文件,使用Multipart上传方式进行多线程分段上传较大文件
 * @author: zrk
 * @time: 2015年3月30日 上午10:45:12
 */
public class OSSUploadFile implements Callable<Integer>{

    public static final Logger LOGGER = Logger.getLogger(OSSUploadFile.class);

    //外层线程池
    public static ExecutorService uploadMainPool = null;
    static{
        uploadMainPool = Executors.newFixedThreadPool(Constant.CONCURRENT_FILE_NUMBER,new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread s = Executors.defaultThreadFactory().newThread(r);
                s.setDaemon(true);
                return s;
            }
        });
    }
    //内层线程池
    private ExecutorService pool ;
    private String sourcePath;//上次路径
    private String bucketName;//bucketName
    private String key;//云端存储路径
    /**
     * oss上传 支持断点续传
     * @param sourcePath    源文件路径
     * @param bucketName    bucketName
     * @param key           存储key  -在oss的存储路径
     */
    public OSSUploadFile(String sourcePath,String bucketName,String key) {
        //实例化单文件上次线程池
        pool = Executors.newFixedThreadPool(Constant.SINGLE_FILE_CONCURRENT_THREADS);
        this.sourcePath = sourcePath;
        this.bucketName = bucketName;
        this.key = key;
    }

    /**
     * 执行当前线程
     * @return
     */
    @SuppressWarnings("finally")
    public Integer uploadFile() {
        Integer r = Global.ERROR;
         //向uploadMainPool中submit当前线程
        Future<Integer> result = uploadMainPool.submit(this);
        try {
            r=result.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally{
            return r;
        }

    }

    /**
     * oss上传
     * @param sourcePath    源文件路径
     * @param bucketName    bucketName
     * @param key           存储key  存储路径
     * @return Integer
     */
    @Override
    public Integer call(){

        OSSClient client  = OSSClientFactory.getInstance();    

        File uploadFile = new File(sourcePath);
        if (!uploadFile.exists()){ 
            LOGGER.info("==无法找到文件:" + sourcePath);
            return Global.FILE_NOT_FOUND_ERROR;
        }
        int result = Global.ERROR;

        key = key.contains("\\\\")?key.replaceAll("\\\\", "/"):key.contains("\\")?key.replaceAll("\\", "/"):key; 
        // 准备Bucket
        result = ensureBucket(client,bucketName);
        if(result == Global.ERROR )return result;
        // 使用multipart的方式上传文件
        result = uploadBigFile(client, bucketName, key, uploadFile);
        pool = null;
        return result;
    }

    // 通过Multipart的方式上传一个大文件
    private int uploadBigFile(OSSClient client, String bucketName, String key,
            File uploadFile)  {

        //自定义的每个上传分块大小
        Integer partSize = Constant.UPLOAD_PART_SIZE;

        //需要上传的文件分块数
        int partCount = calPartCount(uploadFile,partSize);

        //文件的MD5值
        String fileDM5Str = "";
        String uploadId = "";

        //序列化的文件路径(与上传文件同路径使用.up.temp后缀)
        String serializationFilePath = sourcePath+".up.temp";

        boolean isSerializationFile = false;

        //子线程池的线程对象封装类(用于序列化的)
        UploadPartObj uploadPartObj = null;
        //获取文件MD5值
        fileDM5Str = MD5Util.md5Hex(uploadFile);

        //若存在上传失败留下的序列化文件则反序列化对象
        if(new File(serializationFilePath).exists()){
            uploadPartObj = (UploadPartObj)ObjectSerializableUtil.load(serializationFilePath);
            isSerializationFile = true;
        }       

        //序列化文件不存在,分配分块给子线程池线程对象
        if(uploadPartObj==null||!isSerializationFile){
            uploadPartObj = new UploadPartObj();
            try {
                //初始化MultipartUpload 返回uploadId  
                uploadId = initMultipartUpload(client, bucketName, key,fileDM5Str);
            } catch (OSSException|ClientException e) {
                e.printStackTrace();
                LOGGER.error("=="+e.getMessage());
                return Global.OSS_SUBMIT_ERROR;
            } 
            for (int i = 0; i < partCount ; i++) {
                long start = partSize * i;
                long curPartSize = partSize < uploadFile.length() - start ?
                        partSize : uploadFile.length() - start;
                //构造上传线程,UploadPartThread是执行每个分块上传任务的线程
                uploadPartObj.getUploadPartThreads().add(new UploadPartThread(client, bucketName, key,uploadFile, uploadId, i + 1,partSize * i, curPartSize));
            }
        }

        try {
            int i = 0;
            //upload方法提交分块上传线程至子线程池上传,while循环用于上传失败重复上传,Constant.RETRY定义重复次数
            while (upload(uploadPartObj,serializationFilePath).isResult()==false) {
                if(++i == Constant.RETRY)break;
            }
        } catch (Exception e) {
            e.printStackTrace();
            LOGGER.info("==" + e.getMessage());
            return Global.THREAD_ERROR;
        } 

        if(!uploadPartObj.isResult()){
            return Global.NETWORK_ERROR;
        }
        try {
            //完成一个multi-part请求。
            completeMultipartUpload(client, bucketName, key, uploadPartObj);
        } catch (Exception e) {
            e.printStackTrace();
            LOGGER.info("==" + e.getMessage());
            ObjectSerializableUtil.save(uploadPartObj,serializationFilePath);
            return Global.OSS_SUBMIT_ERROR;
        }
        return Global.SUCCESS;
    }

    /**
     * 多线程上传单个文件
     * @param uploadPartObj
     * @param serializationFilePath
     * @return
     */
    private UploadPartObj upload(UploadPartObj uploadPartObj,String serializationFilePath){

        try {

            uploadPartObj.setResult(true);

            //向子线程池中submit单个文件所有分块上传线程
            for (int i=0;i<uploadPartObj.getUploadPartThreads().size();i++) {
                if(uploadPartObj.getUploadPartThreads().get(i).getMyPartETag()==null)
                    pool.submit(uploadPartObj.getUploadPartThreads().get(i));
            }

            //shutdown子线程池,池内所上传任务执行结束后停止当前线程池
            pool.shutdown();
            while (!pool.isTerminated()) {
                //循环检查线程池,同时在此序列化uploadPartObj
                ObjectSerializableUtil.save(uploadPartObj,serializationFilePath);
                pool.awaitTermination(Constant.SERIALIZATION_TIME, TimeUnit.SECONDS);
            }

            //判断上传结果
            for (UploadPartThread uploadPartThread: uploadPartObj.getUploadPartThreads()) {
                if(uploadPartThread.getMyPartETag()==null)
                    uploadPartObj.setResult(false);
            }
            //上传成功 删除序列化文件
            if (uploadPartObj.isResult()==true) 
                ObjectSerializableUtil.delSerlzFile(serializationFilePath);
        } catch (Exception e) {
            LOGGER.info("=="+e.getMessage());

        }
        return uploadPartObj;

    }

    // 根据文件的大小和每个Part的大小计算需要划分的Part个数。
    private static int calPartCount(File f,Integer partSize) {
        int partCount = (int) (f.length() / partSize);
        if (f.length() % partSize != 0){
            partCount++;
        }
        return partCount;
    }

    // 创建Bucket
    private int ensureBucket(OSSClient client, String bucketName){
            try {
                // 创建bucket
                client.createBucket(bucketName);
                //设置bucket的访问权限,public-read-write权限
                client.setBucketAcl(bucketName, CannedAccessControlList.PublicRead);
            } catch (OSSException e) {
                e.printStackTrace();
                LOGGER.info("==" + e.getMessage());
                return Global.ERROR;
            } catch (ClientException e) {
                if (!OSSErrorCode.BUCKET_ALREADY_EXISTS.equals(e.getErrorCode())) {
                    // 如果Bucket已经存在,则忽略
                LOGGER.info("==bucketName已经存在");
                }else{
                    e.printStackTrace();
                    LOGGER.info("=="+e.getMessage());
                    return Global.ERROR;
                }
            }
            return Global.SUCCESS;
    }

    // 初始化一个Multi-part upload请求。
    private static String initMultipartUpload(OSSClient client,String bucketName, String key,String fileDM5Str) throws OSSException,ClientException{
        ObjectMetadata objectMetadata =new ObjectMetadata();
        objectMetadata.getUserMetadata().put(Global.X_OSS_META_MY_MD5,fileDM5Str);
        InitiateMultipartUploadRequest initUploadRequest = new InitiateMultipartUploadRequest(bucketName, key, objectMetadata);
        InitiateMultipartUploadResult initResult = client.initiateMultipartUpload(initUploadRequest);
        String uploadId = initResult.getUploadId();
        return uploadId;
    }

    // 完成一个multi-part请求。
    private static void completeMultipartUpload(OSSClient client, String bucketName, String key,UploadPartObj uploadPartObj){
        List<PartETag> eTags = new ArrayList<PartETag>(); 
        for (UploadPartThread uploadPartThread : uploadPartObj.getUploadPartThreads()) {
            eTags.add(new PartETag(uploadPartThread.getMyPartETag().getPartNumber(),uploadPartThread.getMyPartETag().geteTag()));
        }
        //为part按partnumber排序
        Collections.sort(eTags, new Comparator<PartETag>(){
            public int compare(PartETag arg0, PartETag arg1) {
                PartETag part1= arg0;
                PartETag part2= arg1;
                return part1.getPartNumber() - part2.getPartNumber();
            }  
        });
        CompleteMultipartUploadRequest completeMultipartUploadRequest =
                new CompleteMultipartUploadRequest(bucketName, key, uploadPartObj.getUploadPartThreads().get(0).getUploadId(), eTags);
        client.completeMultipartUpload(completeMultipartUploadRequest);
    }
}


此处引用了多个类

cloudStorage.basis.Constant;//定义程序中使用的变量

cloudStorage.basis.Global;//定义了全局的静态值,错误状态值

cloudStorage.basis.OSSClientFactory;//OSSClient工厂

cloudStorage.oss.upload.UploadPartObj;//分块上传线程类封装

cloudStorage.oss.upload.UploadPartThread;//分块上传线程

cloudStorage.util.ObjectSerializableUtil;//序列工具类

cloudStorage.util.MD5Util;//MD5工具类


调上传方法也是阻塞式,需要给调用者返回结果,所以使用Callable和Future返回int型状态值,下面是子线程池pool中的分块上传线程UploadPartThread的代码清单

UploadPartThread 代码:

[code]package cloudStorage.oss.upload;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Date;
import java.util.concurrent.Callable;

import org.apache.log4j.Logger;

import cloudStorage.basis.OSSClientFactory;
import cloudStorage.service.InDateService;

import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.UploadPartRequest;
import com.aliyun.oss.model.UploadPartResult;

/**
 * @Description: 上传每个part的线程类  可序列化 用于上传的断点续传
 * @author: zrk
 * @time: 2015年4月1日 上午10:35:34
 */
public class UploadPartThread implements Callable<UploadPartThread> ,Serializable {

    private static final long serialVersionUID = 1L;
    public static final Logger LOGGER = Logger.getLogger(UploadPartThread.class);

    private File uploadFile;
    private String bucket;
    private String object;
    private long start;
    private long size;
    private int partId;
    private String uploadId;

    private MyPartETag myPartETag; 

    public UploadPartThread(OSSClient client,String bucket, String object,
            File uploadFile,String uploadId, int partId,
            long start, long partSize) {
        this.uploadFile = uploadFile;
        this.bucket = bucket;
        this.object = object;
        this.start = start;
        this.size = partSize; 
        this.partId = partId;
        this.uploadId = uploadId;
    }

    @Override
    public UploadPartThread call() {

        InputStream in = null;
        try {
            in = new FileInputStream(uploadFile);
            in.skip(start);

            UploadPartRequest uploadPartRequest = new UploadPartRequest();
            uploadPartRequest.setBucketName(bucket);
            uploadPartRequest.setKey(object);
            uploadPartRequest.setUploadId(uploadId);
            uploadPartRequest.setInputStream(in);
            uploadPartRequest.setPartSize(size);
            uploadPartRequest.setPartNumber(partId);
            //MyPartETag是对uploadPartResult.getPartETag()的返回值PartETag的封装,主要是为了能序列化PartETag,MyPartETag仅比PartETag多实现了Serializable接口
            myPartETag = new MyPartETag(uploadPartResult.getPartETag());
        } catch (Exception e) {
            e.printStackTrace();
            LOGGER.error("=="+e.getMessage());
        } finally {
            if (in != null){
                try {
                    in.close();
                } catch (Exception e) {
                    LOGGER.error("==关闭读入流失败:"+e.getMessage());
                }
            }
            return this;
        }
    }

    public String getUploadId() {
        return uploadId;
    }

    public void setUploadId(String uploadId) {
        this.uploadId = uploadId;
    }

    public MyPartETag getMyPartETag() {
        return myPartETag;
    }

    public void setMyPartETag(MyPartETag myPartETag) {
        this.myPartETag = myPartETag;
    }
}


每个UploadPartThread 上传线程执行结束都会将自己作为返回值返回,当前文件是否上传完整有每个线程的返回值决定。MyPartETag是对uploadPartResult.getPartETag()的返回值PartETag的封装,主要是为了能序列化PartETag中的值,MyPartETag仅比PartETag多实现了Serializable接口

ObjectSerializableUtil 代码 点击查看

UploadPartObj 代码:

[code]package cloudStorage.oss.upload;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
 * 单个文件的上传线程集合
 * @Description: TODO
 * @author: zrk
 * @time: 2015年5月6日 下午1:56:54
 */
public class UploadPartObj implements Serializable{

    private static final long serialVersionUID = 1L;

    List<UploadPartThread> uploadPartThreads = Collections.synchronizedList(new ArrayList<UploadPartThread>());

    boolean result = true;

    public List<UploadPartThread> getUploadPartThreads() {
        return uploadPartThreads;
    }
    public void setUploadPartThreads(List<UploadPartThread> uploadPartThreads) {
        this.uploadPartThreads = uploadPartThreads;
    }
    public boolean isResult() {
        return result;
    }
    public void setResult(boolean result) {
        this.result = result;
    }
}


所有上传当前文件的线程都会操作uploadPartThreads ,所以uploadPartThreads 使用集合Collections.synchronizedList将其转换为一个线程安全的类。UploadPartObj 封装了UploadPartThreads和一个用于标识上传成功与失败的boolean值,定时保存序列化文件就直接序列化UploadPartObj ,UploadPartObj 的线程安全是至关重要的。

MyPartETag 代码:

[code]package cloudStorage.oss.upload;
import java.io.Serializable;
import com.aliyun.oss.model.PartETag;
/**
 * @Description: 封装PartETag 用于序列化
 * @author: zrk
 * @time: 2015年4月1日 上午10:52:50
 */
public class MyPartETag implements Serializable {

    private static final long serialVersionUID = 1L;

    private int partNumber;

    private String eTag;

    public MyPartETag(PartETag partETag ) {
        super();
        this.partNumber = partETag.getPartNumber();
        this.eTag = partETag.getETag();
    }

    public int getPartNumber() {
        return partNumber;
    }

    public void setPartNumber(int partNumber) {
        this.partNumber = partNumber;
    }

    public String geteTag() {
        return eTag;
    }

    public void seteTag(String eTag) {
        this.eTag = eTag;
    }
}


OSSClientFactory 代码:

[code]package cloudStorage.basis;
import com.aliyun.oss.OSSClient;
public class OSSClientFactory {
    private static OSSClient ossClient = null;
    private OSSClientFactory() {
    }
    public static OSSClient getInstance() {
        if (ossClient == null) {
            // 可以使用ClientConfiguration对象设置代理服务器、最大重试次数等参数。
            // ClientConfiguration config = new ClientConfiguration();
            ossClient = new OSSClient(Constant.OSS_ENDPOINT,Constant.ACCESS_ID, Constant.ACCESS_KEY);
        }
        return ossClient;
    }
}


Constant 代码:

[code]package cloudStorage.basis;

import cloudStorage.service.OSSConfigService;

/**
 * @Description: 
 * @author: zrk
 * @time: 2015年4月1日 下午5:22:28
 */
public class Constant {
    public static String OSS_ENDPOINT = "http://oss.aliyuncs.com/";
    public static String ACCESS_ID;
    public static String ACCESS_KEY;
    public static Integer DOWNLOAD_PART_SIZE ; // 每个上传Part的大小
    public static Integer UPLOAD_PART_SIZE ; // 每个上传Part的大小
    public static int CONCURRENT_FILE_NUMBER ; // 并发文件数。
    public static int SINGLE_FILE_CONCURRENT_THREADS ; // 单文件并发线程数。
    public static int RETRY ;//失败重试次数
    public static int SERIALIZATION_TIME;//断点保存时间间隔(秒)
    //。。。
}


Constant中数值是加载外部配置文件,也可在这儿直接配置,个别参数值断点下载时使用,只看上传的话请忽略。

Global代码:

[code]package cloudStorage.basis;
/**
 * @Description: TODO
 * @author: zrk
 * @time: 2015年4月1日 下午5:22:46
 */
public class Global {
    public static final int SUCCESS = 1;
    public static final int ERROR = 10;
    public static final int FILE_NOT_FOUND_ERROR = 11;
    public static final int THREAD_ERROR = 12;
    public static final int NETWORK_ERROR = 13;
    public static final int OSS_SUBMIT_ERROR = 14;
//  META
    public static final String X_OSS_META_MY_MD5 = "x-oss-meta-my-md5";
}


调用上传方式:

实例化OSSUploadFile后调用uploadFile方法:

return new OSSUploadFile(sourcePath,bucketName, key).uploadFile();

多线程上传供大家参考,后面自己在使用过程中继续优化。基于OSS-SDK的分块断点下载请参考OSS实现多文件多线程的断点下载(java)

本文地址:/article/2083008.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: