您的位置:首页 > 编程语言 > Java开发

RxJava和retrofit实现多线程下载

2016-09-02 15:46 218 查看
一直感慨RxJava在线程切换时很强大,最近项目中使用到了下载的功能,就想结合Retrofit来做一下这方面的尝试。

场景很简单,服务器提供3个文件的下载地址。客户端点击按钮去异步下载这些文件。这就涉及到多线程下载。

Retrofit部分:

BaseApi:

/**
* songwenju on 16-8-5 : 16 : 09.
* 邮箱:songwenju@outlook.com
*/
public class BaseApi {
//统一的超时时间管理
public static OkHttpClient mOkHttpClient = new OkHttpClient.Builder()
.readTimeout(AppConstant.READ_TIMEOUT, TimeUnit.SECONDS)//设置读取超时时间
.writeTimeout(AppConstant.WRITE_TIMEOUT,TimeUnit.SECONDS)//设置写的超时时间
.connectTimeout(AppConstant.CONNECT_TIMEOUT,TimeUnit.SECONDS)//设置连接超时时间
.build();
}
DownloadApi:这里是一个单例的,结合了懒汉式和饿汉式的特性,它是线程安全且只有一个实例

/**
* songwenju on 16-8-23 : 16 : 32.
* 邮箱:songwenju@outlook.com
*/
public class DownloadApi extends BaseApi{
//单例的DownloadApi
private static DownloadApi mDownloadApi = new DownloadApi();
private DownloadService mDownloadService;

public static DownloadApi getInstance(){
return mDownloadApi;
}

private DownloadApi(){
Retrofit retrofit = new  Retrofit.Builder()
.client(mOkHttpClient)
.baseUrl(AppConstant.BASE_URL)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
mDownloadService = retrofit.create(DownloadService.class);
}

public DownloadService getDownloadService() {
return mDownloadService;
}
}


DownloadService:这里使用了Streaming注解,该注解的作用是在下载大文件中使用。添加了该注解后,下载文件不会将所有的下载内容加载到内存。

/**
* songwenju on 16-8-23 : 16 : 32.
* 邮箱:songwenju@outlook.com
*/
public interface DownloadService {
@Streaming
@GET
Observable<ResponseBody> downloadFile(@Url String fileUrl);
}
WriteFileManager:将获得ResponseBody的内容写入到文件中。

/**
* 写入文件管理类
*/
public class WriteFileManager {
private static final String TAG = "DownLoadManager";

public static boolean writeResponseBodyToDisk(ResponseBody body,String downloadName) {
String path = FileUtil.getVideoPath() + downloadName;
LogUtil.i(TAG, "WriteFileManager.startToWrite.path:" + path);

File futureFile = new File(path);
InputStream inputStream = null;
OutputStream outputStream = null;
long fileSize = body.contentLength();
LogUtil.d(TAG,"WriteFileManager.writeResponseBodyToDisk.fileSize:"+fileSize);

try {
try {
byte[] fileReader = new byte[1024 * 1024];
long fileSizeDownloaded = 0;
inputStream = body.byteStream();
outputStream = new FileOutputStream(futureFile);

while (true) {
int read = inputStream.read(fileReader);

if (read == -1) {
break;
}
outputStream.write(fileReader, 0, read);
fileSizeDownloaded += read;
//                    LogUtil.i(TAG, "file download: " + fileSizeDownloaded + " of " + fileSize);
}
LogUtil.d(TAG, "file download: " + fileSizeDownloaded + " of " + fileSize);
outputStream.flush();
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
} finally {
if (inputStream != null) {
inputStream.close();
}
if (outputStream != null) {
outputStream.close();
}
}
} catch (IOException e) {
return false;
}

}
}


在activity中:这里使用merge去将observable合并。

public class MainActivity extends AppCompatActivity {

private DownloadService mDownloadService;
private List<String> mDownloadList;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
DownloadApi downloadApi = DownloadApi.getInstance();
mDownloadService = downloadApi.getDownloadService();
final String downloadUrl1 = AppConstant.DOWNLOAD_URL1;
final String downloadUrl2 = AppConstant.DOWNLOAD_URL2;
final String downloadUrl3 = AppConstant.DOWNLOAD_URL3;

mDownloadList = new ArrayList<>();
mDownloadList.add(downloadUrl1);
mDownloadList.add(downloadUrl2);
mDownloadList.add(downloadUrl3);
}

/**
* 多线程下载
* @param view view
*/
public void download(View view) {
LogUtil.i(this, "MainActivity.download.");

List<Observable<Boolean>> observables = new ArrayList<Observable<Boolean>>();
//将所有的Observable放到List中
for (int i = 0; i < mDownloadList.size(); i++) {
final String downloadUrl = mDownloadList.get(i);
observables.add(mDownloadService.downloadFile(downloadUrl)
.subscribeOn(Schedulers.io())
.map(new Func1<ResponseBody, Boolean>() {
@Override
public Boolean call(ResponseBody responseBody) {
return WriteFileManager.writeResponseBodyToDisk(responseBody, downloadUrl);
}
}).subscribeOn(Schedulers.io()));
}

//Observable的merge将所有的Observable合成一个Observable,所有的observable同时发射数据。
Observable.merge(observables).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(Boolean b) {
if (b) {
Toast.makeText(MainActivity.this, "Download is sucess", Toast.LENGTH_LONG).show();
}
}
});
}

}
打出的log:
01-09 05:24:04.474 28625-28625/? I/swj_htv_MainActivity: MainActivity.download.
01-09 05:24:04.574 28625-28786/? I/swj_htv_DownLoadManager: DownloadManager.startToWrite.path:/storage/emulated/0/Movies/mvp.wmv
01-09 05:24:04.575 28625-28786/? D/swj_htv_DownLoadManager: DownloadManager.writeResponseBodyToDisk.fileSize:400937223
01-09 05:24:04.575 28625-28785/? I/swj_htv_DownLoadManager: DownloadManager.startToWrite.path:/storage/emulated/0/Movies/chess.mp4
01-09 05:24:04.575 28625-28785/? D/swj_htv_DownLoadManager: DownloadManager.writeResponseBodyToDisk.fileSize:6366633
01-09 05:24:04.575 28625-28784/? I/swj_htv_DownLoadManager: DownloadManager.startToWrite.path:/storage/emulated/0/Movies/protein.mp4
01-09 05:24:04.575 28625-28784/? D/swj_htv_DownLoadManager: DownloadManager.writeResponseBodyToDisk.fileSize:374081533
01-09 05:24:06.039 28625-28785/? D/swj_htv_DownLoadManager: file download: 6366633 of 6366633
01-09 05:25:12.286 28625-28784/? D/swj_htv_DownLoadManager: file download: 374081533 of 374081533
01-09 05:25:14.302 28625-28786/? D/swj_htv_DownLoadManager: file download: 400937223 of 400937223


项目对应的demo已经上传到github上:

RxJavaRetrofitDownload
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息