Java多线程、断点下载程序
2009-05-12 19:10
573 查看
import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class TaskAssign {
public void work(Task task) throws IOException {
File file = new File(task.getSaveFile());
if (file.exists()) {
return;
}
// 这个是记录是否下载成功。我这里也没有增加失败回复、重试之类的工作。
final AtomicBoolean success = new AtomicBoolean(true);
// 任务描述文件
File taskFile = new File(task.getSaveFile() + ".r_task");
// 真正下载的文件
File saveFile = new File(task.getSaveFile() + ".r_save");
boolean taskFileExist = taskFile.exists();
RandomAccessFile taskRandomFile = null;
RandomAccessFile downRandomFile = null;
try {
taskRandomFile = new RandomAccessFile(taskFile, "rw");
downRandomFile = new RandomAccessFile(saveFile, "rw");
long rtnLen = getContentLength(task.getDownURL());
if (!taskFileExist) {
// 如果文件不存在,就初始化任务文件和下载文件
task.setContentLength(rtnLen);
initTaskFile(taskRandomFile, task);
downRandomFile.setLength(rtnLen);
} else {
// 任务文件存在就读取任务文件
task.read(taskRandomFile);
if (task.getContentLength() != rtnLen) {
throw new RuntimeException();
}
}
int secCount = task.getSectionCount();
// 分配线程去下载,这里用到线程池
ExecutorService es = Executors.newFixedThreadPool(task.getWorkerCount());
for (int i = 0; i < secCount; i++) {
final int j = i;
final Task t = task;
final RandomAccessFile f1 = taskRandomFile;
final RandomAccessFile f2 = downRandomFile;
es.execute(new Runnable() {
public void run() {
try {
down(f1, f2, t, j);
} catch (IOException e) {
success.set(false);
e.printStackTrace(System.out);
}
}
});
}
es.shutdown();
try {
es.awaitTermination(24 * 3600, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
taskRandomFile.close();
taskRandomFile = null;
downRandomFile.close();
downRandomFile = null;
// 如果下载成功,去掉任务描述文件、帮下载文件改名
if (success.get()) {
taskFile.delete();
saveFile.renameTo(file);
}
} finally {
if (taskRandomFile != null) {
taskRandomFile.close();
taskRandomFile = null;
}
if (downRandomFile != null) {
downRandomFile.close();
downRandomFile = null;
}
}
}
public void down(RandomAccessFile taskRandomFile,
RandomAccessFile downRandomFile, Task task, int sectionNo)
throws IOException {
// 这里我用HttpURLConnection下载,你也可以用HttpClient或者自己实现一个Http协议(不过貌似没有必要)
URL u = new URL(task.getDownURL());
HttpURLConnection conn = (HttpURLConnection) u.openConnection();
long start = task.getSectionsOffset()[sectionNo];
long end = -1;
// 这里要注意一下,这里是计算当前块的长度
if (sectionNo < task.getSectionCount() - 1) {
long per = task.getContentLength() / task.getSectionCount();
end = per * (sectionNo + 1);
} else {
end = task.getContentLength();
}
if (start >= end) {
System.out.println("Section has finished before. " + sectionNo);
return;
}
String range = "bytes=" + start + "-" + (end - 1);
conn.setRequestProperty("Range", range);
conn.setRequestProperty("User-Agent", "Ray-Downer");
try {
conn.connect();
if (conn.getResponseCode() != 206) {
throw new RuntimeException();
}
if (conn.getContentLength() != (end - start)) {
throw new RuntimeException();
}
InputStream is = conn.getInputStream();
byte[] temp = new byte[task.getBufferSize()];
BufferedInputStream bis = new BufferedInputStream(is, temp.length);
int readed = 0;
while ((readed = bis.read(temp)) > 0) {
long offset = task.getSectionsOffset()[sectionNo];
synchronized (task) {
// 下载之后顺便更新描述文件,你可能会发现这里效率比较低,在一个线程同步里进行两次文件操作。你可以自己实现一个缓冲写。
downRandomFile.seek(offset);
downRandomFile.write(temp, 0, readed);
offset += readed;
task.getSectionsOffset()[sectionNo] = offset;
task.writeOffset(taskRandomFile);
}
}
} finally {
conn.disconnect();
}
System.out.println("Section finished. " + sectionNo);
}
public void initTaskFile(RandomAccessFile taskRandomFile, Task task)
throws IOException {
int secCount = task.getSectionCount();
long per = task.getContentLength() / secCount;
long[] sectionsOffset = new long[secCount];
for (int i = 0; i < secCount; i++) {
sectionsOffset[i] = per * i;
}
task.setSectionsOffset(sectionsOffset);
task.create(taskRandomFile);
}
public long getContentLength(String url) throws IOException {
URL u = new URL(url);
HttpURLConnection conn = (HttpURLConnection) u.openConnection();
try {
return conn.getContentLength();
} finally {
conn.disconnect();
}
}
}
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Task {
private int bufferSize = 64 * 1024;
private int sectionCount;
private String downURL;
private long contentLength;
private long[] sectionsOffset;
private int workerCount;
public long getContentLength() {
return contentLength;
}
public void setContentLength(long contentLength) {
this.contentLength = contentLength;
}
public long[] getSectionsOffset() {
return sectionsOffset;
}
public void setSectionsOffset(long[] sectionsOffset) {
this.sectionsOffset = sectionsOffset;
}
private String saveFile;
public int getSectionCount() {
return sectionCount;
}
public String getDownURL() {
return downURL;
}
public void setDownURL(String downURL) {
this.downURL = downURL;
}
public String getSaveFile() {
return saveFile;
}
public void setSaveFile(String saveFile) {
this.saveFile = saveFile;
}
public void setSectionCount(int sectionCount) {
this.sectionCount = sectionCount;
}
public static final int HEAD_SIZE = 4096;
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
public int getWorkerCount() {
return workerCount;
}
public void setWorkerCount(int workerCount) {
this.workerCount = workerCount;
}
// 读下载描述文件内容
public synchronized void read(RandomAccessFile file) throws IOException {
byte[] temp = new byte[HEAD_SIZE];
file.seek(0);
int readed = file.read(temp);
if (readed != temp.length) {
throw new RuntimeException();
}
ByteArrayInputStream bais = new ByteArrayInputStream(temp);
DataInputStream dis = new DataInputStream(bais);
downURL = dis.readUTF();
saveFile = dis.readUTF();
sectionCount = dis.readInt();
contentLength = dis.readLong();
sectionsOffset = new long[sectionCount];
for (int i = 0; i < sectionCount; i++) {
sectionsOffset[i] = file.readLong();
}
}
// 创建下载描述文件内容
public synchronized void create(RandomAccessFile file) throws IOException {
if (sectionCount != sectionsOffset.length) {
throw new RuntimeException();
}
long len = HEAD_SIZE + 8 * sectionCount;
file.setLength(len);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeUTF(downURL);
dos.writeUTF(saveFile);
dos.writeInt(sectionCount);
dos.writeLong(contentLength);
byte[] src = baos.toByteArray();
byte[] temp = new byte[HEAD_SIZE];
System.arraycopy(src, 0, temp, 0, src.length);
file.seek(0);
file.write(temp);
writeOffset(file);
}
// 更新下载的过程
public synchronized void writeOffset(RandomAccessFile file)
throws IOException {
if (sectionCount != sectionsOffset.length) {
throw new RuntimeException();
}
file.seek(HEAD_SIZE);
for (int i = 0; i < sectionsOffset.length; i++) {
file.writeLong(sectionsOffset[i]);
}
}
}
import java.io.IOException;
public class Main {
public static void main(String[] args) throws IOException {
test3();
System.out.println("/n/n===============/nFinished.");
}
public static void test1() throws IOException {
Task task = new Task();
task .setDownURL("http://61.152.235.21/qqfile/qq/2007iistable/QQ2007IIKB1.exe");
task.setSaveFile("c:/Test2.exe");
task.setSectionCount(200);
task.setWorkerCount(100);
task.setBufferSize(256 * 1024);
TaskAssign ta = new TaskAssign();
ta.work(task);
}
public static void test3() throws IOException {
Task task = new Task();
task.setDownURL("http://apache.mirror.phpchina.com/maven/binaries/apache-maven-2.1.0-bin.zip");
task.setSaveFile("c:/apache-maven-2.1.0-bin.zip");
task.setSectionCount(500);
task.setWorkerCount(10);
task.setBufferSize(128 * 1024);
TaskAssign ta = new TaskAssign();
ta.work(task);
}
public static void test2() throws IOException {
Task task = new Task();
task.setDownURL("http://down.sandai.net/Thunder5.7.9.472.exe");
task.setSaveFile("c:/Thunder.exe");
task.setSectionCount(30);
task.setWorkerCount(30);
task.setBufferSize(128 * 1024);
TaskAssign ta = new TaskAssign();
ta.work(task);
}
}
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class TaskAssign {
public void work(Task task) throws IOException {
File file = new File(task.getSaveFile());
if (file.exists()) {
return;
}
// 这个是记录是否下载成功。我这里也没有增加失败回复、重试之类的工作。
final AtomicBoolean success = new AtomicBoolean(true);
// 任务描述文件
File taskFile = new File(task.getSaveFile() + ".r_task");
// 真正下载的文件
File saveFile = new File(task.getSaveFile() + ".r_save");
boolean taskFileExist = taskFile.exists();
RandomAccessFile taskRandomFile = null;
RandomAccessFile downRandomFile = null;
try {
taskRandomFile = new RandomAccessFile(taskFile, "rw");
downRandomFile = new RandomAccessFile(saveFile, "rw");
long rtnLen = getContentLength(task.getDownURL());
if (!taskFileExist) {
// 如果文件不存在,就初始化任务文件和下载文件
task.setContentLength(rtnLen);
initTaskFile(taskRandomFile, task);
downRandomFile.setLength(rtnLen);
} else {
// 任务文件存在就读取任务文件
task.read(taskRandomFile);
if (task.getContentLength() != rtnLen) {
throw new RuntimeException();
}
}
int secCount = task.getSectionCount();
// 分配线程去下载,这里用到线程池
ExecutorService es = Executors.newFixedThreadPool(task.getWorkerCount());
for (int i = 0; i < secCount; i++) {
final int j = i;
final Task t = task;
final RandomAccessFile f1 = taskRandomFile;
final RandomAccessFile f2 = downRandomFile;
es.execute(new Runnable() {
public void run() {
try {
down(f1, f2, t, j);
} catch (IOException e) {
success.set(false);
e.printStackTrace(System.out);
}
}
});
}
es.shutdown();
try {
es.awaitTermination(24 * 3600, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
taskRandomFile.close();
taskRandomFile = null;
downRandomFile.close();
downRandomFile = null;
// 如果下载成功,去掉任务描述文件、帮下载文件改名
if (success.get()) {
taskFile.delete();
saveFile.renameTo(file);
}
} finally {
if (taskRandomFile != null) {
taskRandomFile.close();
taskRandomFile = null;
}
if (downRandomFile != null) {
downRandomFile.close();
downRandomFile = null;
}
}
}
public void down(RandomAccessFile taskRandomFile,
RandomAccessFile downRandomFile, Task task, int sectionNo)
throws IOException {
// 这里我用HttpURLConnection下载,你也可以用HttpClient或者自己实现一个Http协议(不过貌似没有必要)
URL u = new URL(task.getDownURL());
HttpURLConnection conn = (HttpURLConnection) u.openConnection();
long start = task.getSectionsOffset()[sectionNo];
long end = -1;
// 这里要注意一下,这里是计算当前块的长度
if (sectionNo < task.getSectionCount() - 1) {
long per = task.getContentLength() / task.getSectionCount();
end = per * (sectionNo + 1);
} else {
end = task.getContentLength();
}
if (start >= end) {
System.out.println("Section has finished before. " + sectionNo);
return;
}
String range = "bytes=" + start + "-" + (end - 1);
conn.setRequestProperty("Range", range);
conn.setRequestProperty("User-Agent", "Ray-Downer");
try {
conn.connect();
if (conn.getResponseCode() != 206) {
throw new RuntimeException();
}
if (conn.getContentLength() != (end - start)) {
throw new RuntimeException();
}
InputStream is = conn.getInputStream();
byte[] temp = new byte[task.getBufferSize()];
BufferedInputStream bis = new BufferedInputStream(is, temp.length);
int readed = 0;
while ((readed = bis.read(temp)) > 0) {
long offset = task.getSectionsOffset()[sectionNo];
synchronized (task) {
// 下载之后顺便更新描述文件,你可能会发现这里效率比较低,在一个线程同步里进行两次文件操作。你可以自己实现一个缓冲写。
downRandomFile.seek(offset);
downRandomFile.write(temp, 0, readed);
offset += readed;
task.getSectionsOffset()[sectionNo] = offset;
task.writeOffset(taskRandomFile);
}
}
} finally {
conn.disconnect();
}
System.out.println("Section finished. " + sectionNo);
}
public void initTaskFile(RandomAccessFile taskRandomFile, Task task)
throws IOException {
int secCount = task.getSectionCount();
long per = task.getContentLength() / secCount;
long[] sectionsOffset = new long[secCount];
for (int i = 0; i < secCount; i++) {
sectionsOffset[i] = per * i;
}
task.setSectionsOffset(sectionsOffset);
task.create(taskRandomFile);
}
public long getContentLength(String url) throws IOException {
URL u = new URL(url);
HttpURLConnection conn = (HttpURLConnection) u.openConnection();
try {
return conn.getContentLength();
} finally {
conn.disconnect();
}
}
}
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Task {
private int bufferSize = 64 * 1024;
private int sectionCount;
private String downURL;
private long contentLength;
private long[] sectionsOffset;
private int workerCount;
public long getContentLength() {
return contentLength;
}
public void setContentLength(long contentLength) {
this.contentLength = contentLength;
}
public long[] getSectionsOffset() {
return sectionsOffset;
}
public void setSectionsOffset(long[] sectionsOffset) {
this.sectionsOffset = sectionsOffset;
}
private String saveFile;
public int getSectionCount() {
return sectionCount;
}
public String getDownURL() {
return downURL;
}
public void setDownURL(String downURL) {
this.downURL = downURL;
}
public String getSaveFile() {
return saveFile;
}
public void setSaveFile(String saveFile) {
this.saveFile = saveFile;
}
public void setSectionCount(int sectionCount) {
this.sectionCount = sectionCount;
}
public static final int HEAD_SIZE = 4096;
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
public int getWorkerCount() {
return workerCount;
}
public void setWorkerCount(int workerCount) {
this.workerCount = workerCount;
}
// 读下载描述文件内容
public synchronized void read(RandomAccessFile file) throws IOException {
byte[] temp = new byte[HEAD_SIZE];
file.seek(0);
int readed = file.read(temp);
if (readed != temp.length) {
throw new RuntimeException();
}
ByteArrayInputStream bais = new ByteArrayInputStream(temp);
DataInputStream dis = new DataInputStream(bais);
downURL = dis.readUTF();
saveFile = dis.readUTF();
sectionCount = dis.readInt();
contentLength = dis.readLong();
sectionsOffset = new long[sectionCount];
for (int i = 0; i < sectionCount; i++) {
sectionsOffset[i] = file.readLong();
}
}
// 创建下载描述文件内容
public synchronized void create(RandomAccessFile file) throws IOException {
if (sectionCount != sectionsOffset.length) {
throw new RuntimeException();
}
long len = HEAD_SIZE + 8 * sectionCount;
file.setLength(len);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeUTF(downURL);
dos.writeUTF(saveFile);
dos.writeInt(sectionCount);
dos.writeLong(contentLength);
byte[] src = baos.toByteArray();
byte[] temp = new byte[HEAD_SIZE];
System.arraycopy(src, 0, temp, 0, src.length);
file.seek(0);
file.write(temp);
writeOffset(file);
}
// 更新下载的过程
public synchronized void writeOffset(RandomAccessFile file)
throws IOException {
if (sectionCount != sectionsOffset.length) {
throw new RuntimeException();
}
file.seek(HEAD_SIZE);
for (int i = 0; i < sectionsOffset.length; i++) {
file.writeLong(sectionsOffset[i]);
}
}
}
import java.io.IOException;
public class Main {
public static void main(String[] args) throws IOException {
test3();
System.out.println("/n/n===============/nFinished.");
}
public static void test1() throws IOException {
Task task = new Task();
task .setDownURL("http://61.152.235.21/qqfile/qq/2007iistable/QQ2007IIKB1.exe");
task.setSaveFile("c:/Test2.exe");
task.setSectionCount(200);
task.setWorkerCount(100);
task.setBufferSize(256 * 1024);
TaskAssign ta = new TaskAssign();
ta.work(task);
}
public static void test3() throws IOException {
Task task = new Task();
task.setDownURL("http://apache.mirror.phpchina.com/maven/binaries/apache-maven-2.1.0-bin.zip");
task.setSaveFile("c:/apache-maven-2.1.0-bin.zip");
task.setSectionCount(500);
task.setWorkerCount(10);
task.setBufferSize(128 * 1024);
TaskAssign ta = new TaskAssign();
ta.work(task);
}
public static void test2() throws IOException {
Task task = new Task();
task.setDownURL("http://down.sandai.net/Thunder5.7.9.472.exe");
task.setSaveFile("c:/Thunder.exe");
task.setSectionCount(30);
task.setWorkerCount(30);
task.setBufferSize(128 * 1024);
TaskAssign ta = new TaskAssign();
ta.work(task);
}
}
相关文章推荐
- 一个简单的多线程、断点下载Java程序
- 用java编写多线程ftp断点下载文件程序
- Java多线程断点下载多文件(窗口程序带进度条)
- JAVA写的多线程下载程序,并具有断点续传功能
- 用java编写多线程ftp断点下载文件程序
- JAVA写的多线程下载程序,并具有断点续传功能
- Java 多线程断点下载文件
- Java实现多线程断点下载(下载过程中可以暂停)
- JAVA实现多线程断点下载
- Java多线程断点下载
- Java实现多线程断点下载
- Java 多线程断点下载(面向对象)
- Java---多线程断点下载
- 〖編程·Java〗Java 多线程断点下载文件
- JAVA多线程断点下载
- 使用java实现http多线程断点下载文件(一)
- java 多线程断点下载功能
- java实现多线程断点下载
- Java基础之多线程断点下载
- Java多线程断点下载