您的位置:首页 > 其它

一个基于NIO的下载队列实现

2012-02-09 17:59 323 查看
package l.test1.queue.nio;

import java.io.File;

public interface Task {
public String getNetWorkPath();

public String getName();
/**
*
* @param file
*/
public void success(File file);
/**
*
* @param netWorkPath
*/
public void fail(String netWorkPath);
}

package l.test1.queue.nio;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

public class NIODownLoad extends Thread {
private boolean run = true;
private static final int DOWN_LOAD_MAX_COUNT = 10;
private int downLoadCount = 0;
private Selector selector = null;
private Queue<Task> queue = new LinkedList<Task>();

private NIODownLoad() {
setDaemon(true);
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}

public static final NIODownLoad instance = new NIODownLoad();

static{
instance.start();
}

public static NIODownLoad getInstance() {
return instance;
}

@Override
public void run() {
SelectionKey key = null;
while (run) {
try {
if(selector.select() <=0 ){
success();
continue;
}
for (Iterator<SelectionKey> iter = selector.selectedKeys()
.iterator(); iter.hasNext();) {
key = iter.next();
iter.remove();
if (key.isConnectable()) {
connect(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (IOException e) {
if(null != key) {
key.cancel();
key = null;
downLoadCount --;
}
e.printStackTrace();
}
}
}

public void add(Task task) {
synchronized (queue) {
File file = new File(DownLoadSession.basePath+"/"+task.getName());
if(file.exists()) {
task.success(file);
return;
}
if(queue.contains(task)) {
System.out.println("exist");
return;
}
queue.add(task);
if(downLoadCount < DOWN_LOAD_MAX_COUNT) {
//				instance.interrupt();
selector.wakeup();
}
}
}

private void connect(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
if (client.finishConnect()) {
Task task = (Task) key.attachment();
client.register(selector, SelectionKey.OP_WRITE,new DownLoadSession(client, task));
}
}

private void read(SelectionKey key) throws IOException {
DownLoadSession session = (DownLoadSession) key.attachment();
if (session.read() == -1) {
downLoadCount --;
success();
key.cancel();
}
}

private void write(SelectionKey key) throws IOException {
DownLoadSession session = (DownLoadSession) key.attachment();
if (session.write() == -1) {
key.interestOps(SelectionKey.OP_READ);
}
}

private void success() {
if(downLoadCount < DOWN_LOAD_MAX_COUNT) {
synchronized (queue) {
if(!queue.isEmpty()) {
System.out.println("success");
Task task = queue.poll();
URL url;
try {
url = new URL(task.getNetWorkPath());
int port = url.getPort();
if(-1 == port) {
port = 80;
}
InetSocketAddress remote = new InetSocketAddress(url.getHost(),port);
try {
SocketChannel socket = SocketChannel.open();
socket.configureBlocking(false);
socket.connect(remote);
//							socket.finishConnect();
socket.register(selector, SelectionKey.OP_CONNECT, task);
downLoadCount ++;
} catch (IOException e) {
e.printStackTrace();
}
} catch (MalformedURLException e1) {
e1.printStackTrace();
}
}
}
}
}

public static void main(String[] args) throws Exception{
NIODownLoad down = new NIODownLoad();
//		for(int i=0;i<10000;i++) {
Task task = new T("http://www.baidu.com/img/baidu_sylogo1.gif");
down.add(task);
down.add(new T("http://www.google.com.hk/intl/zh-CN/images/logo_cn.png"));
//		}
down.start();
//		down.add(new T("http://www.google.com.hk/intl/zh-CN/images/logo_cn.png"));
////
//////		Task task = new T("http://www.google.com.hk/intl/zh-CN/images/logo_cn.png");
//////		Task task = new T("http://www.google.com.hk/intl/zh-CN/images/logo_cn.png");
//////		Task task = new T("http://www.google.com.hk/intl/zh-CN/images/logo_cn.png");
//		System.out.println(new URL("http://www.google.com.hk").getPath());
}
}

class T implements Task {
private String url;
private String name;
T(String url) {
this.url = url;
String n = "";
try {
URL u = new URL(url);
String path = u.getPath();
if(path.length() == 0) {
n = "/index.html";
}else {
int p = path.lastIndexOf('/');
if(p != -1) {
n = path.substring(p);
}
//				n = path;
}
} catch (MalformedURLException e) {
e.printStackTrace();
}
//		name = System.currentTimeMillis()+"_"+((int)(Math.random() * 1000))+"_"+n;
name = n;
}
@Override
public String getNetWorkPath() {
return url;
}

@Override
public String getName() {
return name;
}

@Override
public void success(File file) {
System.out.println(file.getAbsolutePath());
}

@Override
public void fail(String netWorkPath) {
System.out.println("ERROR   "+ netWorkPath);
}

}

package l.test1.queue.nio;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class DownLoadSession {

public static final String basePath = new File("/sdcard/test/photos/").getAbsolutePath();
private int bufSize = 512;
private SocketChannel client;
private Task task;
private ByteBuffer buf;
private FileChannel fileChannel;
private File file;
private boolean headEnd = false;
private byte[] headCheckCache = new byte[4];
private int headCheckIndex = 0;

private ByteBuffer head;
private File newFile;

public DownLoadSession(SocketChannel client, Task task) {
this.client = client;
this.task = task;
buf = ByteBuffer.allocate(bufSize);
try {
newFile = new File(basePath + "/" + task.getName());
file = new File(basePath +"/"+task.getName()+".data");
if(!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
fileChannel = new FileOutputStream(file).getChannel();
builderHead();
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}

private void builderHead() {
try {
URL url = new URL(task.getNetWorkPath());
String host = url.getHost();
String path = url.getPath();
if(path.length() == 0) {
path = "/";
}
String query = url.getQuery();
StringBuilder builder = new StringBuilder("GET ")
.append(String.format("%s%s%s", path,null==query?"":"?",null==query?"":query))
.append(" HTTP/1.1\r\nHOST: "+host+"\r\nConnection: close\r\n\r\n");
head = ByteBuffer.wrap(builder.toString().getBytes());
} catch (MalformedURLException e) {
e.printStackTrace();
}

}

public int read() throws IOException{
try{
int readBytes = client.read(buf);
if(readBytes > 0) {
buf.flip();
if(headEnd) {
fileChannel.write(buf);
} else {
readHead();
if(buf.hasRemaining()) {
fileChannel.write(buf);
}
}
buf.compact();
}else if(readBytes == -1) {
fileChannel.close();
file.renameTo(newFile);
task.success(newFile);
}
return readBytes;
}catch(IOException e){
fileChannel.close();
file.delete();
throw e;
}
}

private void readHead() {
while(buf.hasRemaining() && !headEnd) {
byte b = buf.get();
//			if(headCheckIndex == 3) {
//				for(int i=0;i<headCheckIndex;i++) {
//					headCheckCache[i] = headCheckCache[i+1];
//				}
//			}
headCheckCache[headCheckIndex] = b;
headEnd = checkHead();
if(headEnd) {
break;
}
if(headCheckIndex < 3) {
headCheckIndex ++;
}
else {
headCheckIndex = 0;
}

}
}

private boolean checkHead() {
int i = Math.abs(headCheckIndex - 3);
//		int i = 0;
return headCheckCache[i++ % 4] == '\r' && headCheckCache[i++ % 4] == '\n'
&& headCheckCache[i++ % 4] == '\r' && headCheckCache[i % 4] == '\n';
}

public int write() throws IOException{
if(null != client && null != head && head.hasRemaining()) {
return client.write(head);
}
return -1;
}

}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: