JAVA实现环形缓冲多线程读取远程文件
2010-02-10 19:32
441 查看
如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。当然,多线程中令人头痛的死锁问题、HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂。
简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类)每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填写buf[];读/写并发使用buf[]。
经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播放器对比,我的这种方法能够比较流畅、稳定地下载并播放。
本次修正了几处可能产生死锁的情况,源码中加入一些注解,略去了一些和本文无关的方法实现。
一、HttpReader类功能:HTTP协议从指定URL读取数据。
Java代码
package instream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
public final class HttpReader {
public static final int MAX_RETRY = 10;
private URL url;
private HttpURLConnection httpConnection;
private InputStream in_stream;
private long cur_pos; //决定seek方法中是否执行文件读取定位
private int connect_timeout;
private int read_timeout;
public HttpReader(URL u) {
this(u, 5000, 5000);
}
public HttpReader(URL u, int connect_timeout, int read_timeout) {
this.connect_timeout = connect_timeout;
this.read_timeout = read_timeout;
url = u;
}
public int read(byte[] b, int off, int len) throws IOException {
int r = in_stream.read(b, off, len);
cur_pos += r;
return r;
}
public int getData(byte[] b, int off, int len) throws IOException {
//...
}
public void close() {
//...
}
/*
* 抛出异常通知重试.
* 例如响应码503可能是由某种暂时的原因引起的,同一IP频繁的连接请求会遭服务器拒绝.
*/
public void seek(long start_pos) throws IOException {
if (start_pos == cur_pos && in_stream != null)
return;
if (httpConnection != null) {
httpConnection.disconnect();
httpConnection = null;
}
if (in_stream != null) {
in_stream.close();
in_stream = null;
}
httpConnection = (HttpURLConnection) url.openConnection();
httpConnection.setConnectTimeout(connect_timeout);
httpConnection.setReadTimeout(read_timeout);
String sProperty = "bytes=" + start_pos + "-";
httpConnection.setRequestProperty("Range", sProperty);
//httpConnection.setRequestProperty("Connection", "Keep-Alive");
int responseCode = httpConnection.getResponseCode();
if (responseCode < 200 || responseCode >= 300) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new IOException("HTTP responseCode="+responseCode);
}
in_stream = httpConnection.getInputStream();
cur_pos = start_pos;
//System.out.println(Thread.currentThread().getName()+ ", cur_pos="+cur_pos);
}
}
二、IWriterCallBack接口
Java代码
package instream;
/*
* 读/写通信接口.类似于C++的回调函数
*
* 例:
* class BuffRandAcceURL 内实现本接口的方法tryWriting()
* class BuffRandAcceURL 内new Writer(this, ...)传值到Writer.icb
* class Writer 内调用icb.tryWriting()
*/
public interface IWriterCallBack {
public int tryWriting() throws InterruptedException;
public void updateBuffer(int i, int len);
public void updateWriterCount();
public int getWriterCount();
public void terminateWriters();
}
三、Writer类:下载线程,负责向buf[]写数据。
Java代码
package instream;
import java.net.URL;
public final class Writer implements Runnable {
private static boolean isalive = true; // 一个线程超时其它线程也能退出
private static byte[] buf;
private static IWriterCallBack icb;
private HttpReader hr;
public Writer(IWriterCallBack cb, URL u, byte[] b, int i) {
hr = new HttpReader(u);
icb = cb;
buf = b;
Thread t = new Thread(this,"dt_"+i);
t.setPriority(Thread.NORM_PRIORITY + 1);
t.start();
}
public void run() {
int wbytes=0, wpos=0, rema = 0, retry = 0;
int idxmask = BuffRandAcceURL.UNIT_COUNT - 1;
boolean cont = true;
int index = 0; //buf[]内"块"索引号
int startpos = 0; //index对应的文件位置(相对于文件首的偏移量)
long time0 = 0;
while (cont) {
try {
// 1.等待空闲块
if(retry == 0) {
if ((startpos = icb.tryWriting()) == -1)
break;
index = (startpos >> BuffRandAcceURL.UNIT_LENGTH_BITS) & idxmask;
wpos = startpos & BuffRandAcceURL.BUF_LENGTH_MASK;
wbytes = 0;
rema = BuffRandAcceURL.UNIT_LENGTH;
time0 = System.currentTimeMillis();
}
// 2.定位
hr.seek(startpos);
// 3.下载"一块"
int w;
while (rema > 0 && isalive) {
w = (rema < 2048) ? rema : 2048; //每次读几K合适?
if ((w = hr.read(buf, wpos, w)) == -1) {
cont = false;
break;
}
rema -= w;
wpos += w;
startpos += w; // 能断点续传
wbytes += w;
}
// 下载速度足够快就结束本线程
long t = System.currentTimeMillis() - time0;
if(icb.getWriterCount() > 1 && t < 500)
cont = false;
//4.通知"读"线程
retry = 0;
icb.updateBuffer(index, wbytes);
} catch (Exception e) {
if(++retry == HttpReader.MAX_RETRY) {
isalive = false;
icb.terminateWriters();
break;
}
}
}
icb.updateWriterCount();
try {
hr.close();
} catch (Exception e) {}
hr = null;
}
}
四、IRandomAccess接口:随机读取文件接口,BuffRandAcceURL类和BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件,这儿就不给出其源码了。
Java代码
package instream;
public interface IRandomAccess {
public int read() throws Exception;
public int read(byte b[]) throws Exception;
public int read(byte b[], int off, int len) throws Exception;
public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;
public void seek(long pos) throws Exception;
public long length();
public long getFilePointer();
public void close();
}
五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。
关键是如何简单有效防止死锁?
Java代码
package instream;
import java.net.URL;
import java.net.URLDecoder;
import tag.ID3Tag;
/*
* FIFO方式共享环形缓冲区buf[]
* buf[]逻辑上分成16块, 每一块的长度UNIT_LENGTH=32K不小于最大帧长1732
*
* 同步: 写/写 -- 互斥等待空闲块
* 读/写 -- 并发访问buf[]
*
*/
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {
public static final int UNIT_LENGTH_BITS = 15;
public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS; //2^16=32K
public static final int BUF_LENGTH = UNIT_LENGTH << 4;
public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS; //16块
public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);
private static final int MAX_WRITER = 5;
private static long file_pointer;
private static int read_pos;
private static int fill_bytes;
private static byte[] buf; //同时作写线程同步锁
private static int[] unit_bytes; //同时作读线程互斥锁
private static int alloc_pos;
private static URL url;
private static boolean isalive = true;
private static int writer_count;
private static long file_length;
private static long frame_bytes;
private static int free_unit = UNIT_COUNT; // "信号量"计数器
public BuffRandAcceURL(String sURL) throws Exception {
this(sURL,MAX_WRITER);
}
public BuffRandAcceURL(String sURL, int download_threads) throws Exception {
buf = new byte[BUF_LENGTH];
unit_bytes = new int[UNIT_COUNT];
url = new URL(sURL);
// 打印文件名
try {
String s = URLDecoder.decode(sURL, "GBK");
System.out.println(s.substring(s.lastIndexOf("/") + 1));
} catch (Exception e) {
System.out.println(sURL);
}
// 获取文件长度
// 为何同一URL得到的文件长度有时对有时又不对?
frame_bytes = file_length = url.openConnection().getContentLength();
if (file_length == -1)
throw new Exception("ContentLength=-1");
// 创建线程以异步方式解析tag
new TagThread(url, file_length);
// 创建"写"线程
// 线程被创建后立即连接URL开始下载,由于服务器限制了同一IP每秒最大连接次数,频繁连接
// 会被服务器拒绝,因此延时.
writer_count = download_threads;
for (int i = 0; i < download_threads; i++) {
new Writer(this, url, buf, i + 1);
Thread.sleep(200);
}
// 缓冲
try_cache();
// 跳过 ID3 v2
ID3Tag tag = new ID3Tag();
int v2_size = tag.checkID3V2(buf, 0);
if (v2_size > 0) {
frame_bytes -= v2_size;
seek(v2_size);
}
tag = null;
}
/*
* 缓冲
*/
private void try_cache() throws InterruptedException {
int cache_size = BUF_LENGTH;
int bi = unit_bytes[read_pos >> UNIT_LENGTH_BITS];
if(bi != 0)
cache_size -= UNIT_LENGTH - bi;
while (fill_bytes < cache_size) {
if (writer_count == 0 || isalive == false)
return;
System.out.printf("/r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);
synchronized (unit_bytes) {
unit_bytes.wait(200); //wait(200)错过通知也可结束循环?
}
}
System.out.printf("/r");
}
private int try_reading(int i, int len) throws Exception {
int n = (i + 1) & (UNIT_COUNT - 1);
int r = (unit_bytes[i] > 0) ? (unit_bytes[i] + unit_bytes
) : unit_bytes[i];
if (r < len) {
if (writer_count == 0 || isalive == false)
return r;
try_cache();
}
return len;
}
/*
* 各个"写"线程互斥等待空闲块
* 空闲块按由小到大的顺序分配;管理空闲块采用类似于C++的信号量机制.
*/
public int tryWriting() throws InterruptedException {
int ret = -1;
synchronized (buf) {
while (free_unit == 0 && isalive)
buf.wait();
if(alloc_pos >= file_length || isalive == false)
return -1;
ret = alloc_pos;
alloc_pos += UNIT_LENGTH;
free_unit--;
}
return ret;
}
/*
* "写"线程向buf[]写完数据后调用,通知"读"线程
*/
public void updateBuffer(int i, int len) {
synchronized (unit_bytes) {
unit_bytes[i] = len;
fill_bytes += len;
unit_bytes.notify();
}
}
/*
* "写"线程准备退出时调用
*/
public void updateWriterCount() {
synchronized (unit_bytes) {
writer_count--;
unit_bytes.notify();
}
}
public int getWriterCount() {
return writer_count;
}
/*
* read方法内调用
*/
public void notifyWriter() {
synchronized (buf) {
buf.notifyAll();
}
}
/*
* 被某个"写"线程调用,用于终止其它"写"线程;isalive也影响"读"线程流程
*/
public void terminateWriters() {
synchronized (unit_bytes) {
if (isalive) {
isalive = false;
System.out.println("/n读取文件超时。重试 " + HttpReader.MAX_RETRY
+ " 次后放弃,请您稍后再试。");
}
unit_bytes.notify();
}
notifyWriter();
}
public int read() throws Exception {
//...
}
public int read(byte b[]) throws Exception {
return read(b, 0, b.length);
}
public int read(byte[] b, int off, int len) throws Exception {
int i = read_pos >> UNIT_LENGTH_BITS;
// 1.等待有足够内容可读
if(try_reading(i, len) < len || isalive == false)
return -1;
// 2.读取
int tail = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH
if (tail < len) {
System.arraycopy(buf, read_pos, b, off, tail);
System.arraycopy(buf, 0, b, off + tail, len - tail);
} else
System.arraycopy(buf, read_pos, b, off, len);
fill_bytes -= len;
file_pointer += len;
read_pos += len;
read_pos &= BUF_LENGTH_MASK;
unit_bytes[i] -= len;
if (unit_bytes[i] < 0) {
int ni = read_pos >> UNIT_LENGTH_BITS;
unit_bytes[ni] += unit_bytes[i];
unit_bytes[i] = 0;
free_unit++;
notifyWriter();
} else if (unit_bytes[i] == 0) {
free_unit++; // 空闲块"信号量"计数加1
notifyWriter(); // 3.通知
}
// 如果下一块未填满,意味着文件读完,第1步已处理一次读空两块的情况
return len;
}
/*
* 从buf[]偏移src_off位置复制,不移动文件"指针",不发信号.
*/
public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {
//...
}
public long length() {
//...
}
public long getFilePointer() {
//...
}
public void close() {
//...
}
/*
* 随机读取定位
*/
public void seek(long pos) throws Exception {
//...
}
}
简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类)每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填写buf[];读/写并发使用buf[]。
经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播放器对比,我的这种方法能够比较流畅、稳定地下载并播放。
本次修正了几处可能产生死锁的情况,源码中加入一些注解,略去了一些和本文无关的方法实现。
一、HttpReader类功能:HTTP协议从指定URL读取数据。
Java代码
package instream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
public final class HttpReader {
public static final int MAX_RETRY = 10;
private URL url;
private HttpURLConnection httpConnection;
private InputStream in_stream;
private long cur_pos; //决定seek方法中是否执行文件读取定位
private int connect_timeout;
private int read_timeout;
public HttpReader(URL u) {
this(u, 5000, 5000);
}
public HttpReader(URL u, int connect_timeout, int read_timeout) {
this.connect_timeout = connect_timeout;
this.read_timeout = read_timeout;
url = u;
}
public int read(byte[] b, int off, int len) throws IOException {
int r = in_stream.read(b, off, len);
cur_pos += r;
return r;
}
public int getData(byte[] b, int off, int len) throws IOException {
//...
}
public void close() {
//...
}
/*
* 抛出异常通知重试.
* 例如响应码503可能是由某种暂时的原因引起的,同一IP频繁的连接请求会遭服务器拒绝.
*/
public void seek(long start_pos) throws IOException {
if (start_pos == cur_pos && in_stream != null)
return;
if (httpConnection != null) {
httpConnection.disconnect();
httpConnection = null;
}
if (in_stream != null) {
in_stream.close();
in_stream = null;
}
httpConnection = (HttpURLConnection) url.openConnection();
httpConnection.setConnectTimeout(connect_timeout);
httpConnection.setReadTimeout(read_timeout);
String sProperty = "bytes=" + start_pos + "-";
httpConnection.setRequestProperty("Range", sProperty);
//httpConnection.setRequestProperty("Connection", "Keep-Alive");
int responseCode = httpConnection.getResponseCode();
if (responseCode < 200 || responseCode >= 300) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new IOException("HTTP responseCode="+responseCode);
}
in_stream = httpConnection.getInputStream();
cur_pos = start_pos;
//System.out.println(Thread.currentThread().getName()+ ", cur_pos="+cur_pos);
}
}
package instream; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; public final class HttpReader { public static final int MAX_RETRY = 10; private URL url; private HttpURLConnection httpConnection; private InputStream in_stream; private long cur_pos; //决定seek方法中是否执行文件读取定位 private int connect_timeout; private int read_timeout; public HttpReader(URL u) { this(u, 5000, 5000); } public HttpReader(URL u, int connect_timeout, int read_timeout) { this.connect_timeout = connect_timeout; this.read_timeout = read_timeout; url = u; } public int read(byte[] b, int off, int len) throws IOException { int r = in_stream.read(b, off, len); cur_pos += r; return r; } public int getData(byte[] b, int off, int len) throws IOException { //... } public void close() { //... } /* * 抛出异常通知重试. * 例如响应码503可能是由某种暂时的原因引起的,同一IP频繁的连接请求会遭服务器拒绝. */ public void seek(long start_pos) throws IOException { if (start_pos == cur_pos && in_stream != null) return; if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null; } if (in_stream != null) { in_stream.close(); in_stream = null; } httpConnection = (HttpURLConnection) url.openConnection(); httpConnection.setConnectTimeout(connect_timeout); httpConnection.setReadTimeout(read_timeout); String sProperty = "bytes=" + start_pos + "-"; httpConnection.setRequestProperty("Range", sProperty); //httpConnection.setRequestProperty("Connection", "Keep-Alive"); int responseCode = httpConnection.getResponseCode(); if (responseCode < 200 || responseCode >= 300) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } throw new IOException("HTTP responseCode="+responseCode); } in_stream = httpConnection.getInputStream(); cur_pos = start_pos; //System.out.println(Thread.currentThread().getName()+ ", cur_pos="+cur_pos); } }
二、IWriterCallBack接口
Java代码
package instream;
/*
* 读/写通信接口.类似于C++的回调函数
*
* 例:
* class BuffRandAcceURL 内实现本接口的方法tryWriting()
* class BuffRandAcceURL 内new Writer(this, ...)传值到Writer.icb
* class Writer 内调用icb.tryWriting()
*/
public interface IWriterCallBack {
public int tryWriting() throws InterruptedException;
public void updateBuffer(int i, int len);
public void updateWriterCount();
public int getWriterCount();
public void terminateWriters();
}
package instream; /* * 读/写通信接口.类似于C++的回调函数 * * 例: * class BuffRandAcceURL 内实现本接口的方法tryWriting() * class BuffRandAcceURL 内new Writer(this, ...)传值到Writer.icb * class Writer 内调用icb.tryWriting() */ public interface IWriterCallBack { public int tryWriting() throws InterruptedException; public void updateBuffer(int i, int len); public void updateWriterCount(); public int getWriterCount(); public void terminateWriters(); }
三、Writer类:下载线程,负责向buf[]写数据。
Java代码
package instream;
import java.net.URL;
public final class Writer implements Runnable {
private static boolean isalive = true; // 一个线程超时其它线程也能退出
private static byte[] buf;
private static IWriterCallBack icb;
private HttpReader hr;
public Writer(IWriterCallBack cb, URL u, byte[] b, int i) {
hr = new HttpReader(u);
icb = cb;
buf = b;
Thread t = new Thread(this,"dt_"+i);
t.setPriority(Thread.NORM_PRIORITY + 1);
t.start();
}
public void run() {
int wbytes=0, wpos=0, rema = 0, retry = 0;
int idxmask = BuffRandAcceURL.UNIT_COUNT - 1;
boolean cont = true;
int index = 0; //buf[]内"块"索引号
int startpos = 0; //index对应的文件位置(相对于文件首的偏移量)
long time0 = 0;
while (cont) {
try {
// 1.等待空闲块
if(retry == 0) {
if ((startpos = icb.tryWriting()) == -1)
break;
index = (startpos >> BuffRandAcceURL.UNIT_LENGTH_BITS) & idxmask;
wpos = startpos & BuffRandAcceURL.BUF_LENGTH_MASK;
wbytes = 0;
rema = BuffRandAcceURL.UNIT_LENGTH;
time0 = System.currentTimeMillis();
}
// 2.定位
hr.seek(startpos);
// 3.下载"一块"
int w;
while (rema > 0 && isalive) {
w = (rema < 2048) ? rema : 2048; //每次读几K合适?
if ((w = hr.read(buf, wpos, w)) == -1) {
cont = false;
break;
}
rema -= w;
wpos += w;
startpos += w; // 能断点续传
wbytes += w;
}
// 下载速度足够快就结束本线程
long t = System.currentTimeMillis() - time0;
if(icb.getWriterCount() > 1 && t < 500)
cont = false;
//4.通知"读"线程
retry = 0;
icb.updateBuffer(index, wbytes);
} catch (Exception e) {
if(++retry == HttpReader.MAX_RETRY) {
isalive = false;
icb.terminateWriters();
break;
}
}
}
icb.updateWriterCount();
try {
hr.close();
} catch (Exception e) {}
hr = null;
}
}
package instream; import java.net.URL; public final class Writer implements Runnable { private static boolean isalive = true; // 一个线程超时其它线程也能退出 private static byte[] buf; private static IWriterCallBack icb; private HttpReader hr; public Writer(IWriterCallBack cb, URL u, byte[] b, int i) { hr = new HttpReader(u); icb = cb; buf = b; Thread t = new Thread(this,"dt_"+i); t.setPriority(Thread.NORM_PRIORITY + 1); t.start(); } public void run() { int wbytes=0, wpos=0, rema = 0, retry = 0; int idxmask = BuffRandAcceURL.UNIT_COUNT - 1; boolean cont = true; int index = 0; //buf[]内"块"索引号 int startpos = 0; //index对应的文件位置(相对于文件首的偏移量) long time0 = 0; while (cont) { try { // 1.等待空闲块 if(retry == 0) { if ((startpos = icb.tryWriting()) == -1) break; index = (startpos >> BuffRandAcceURL.UNIT_LENGTH_BITS) & idxmask; wpos = startpos & BuffRandAcceURL.BUF_LENGTH_MASK; wbytes = 0; rema = BuffRandAcceURL.UNIT_LENGTH; time0 = System.currentTimeMillis(); } // 2.定位 hr.seek(startpos); // 3.下载"一块" int w; while (rema > 0 && isalive) { w = (rema < 2048) ? rema : 2048; //每次读几K合适? if ((w = hr.read(buf, wpos, w)) == -1) { cont = false; break; } rema -= w; wpos += w; startpos += w; // 能断点续传 wbytes += w; } // 下载速度足够快就结束本线程 long t = System.currentTimeMillis() - time0; if(icb.getWriterCount() > 1 && t < 500) cont = false; //4.通知"读"线程 retry = 0; icb.updateBuffer(index, wbytes); } catch (Exception e) { if(++retry == HttpReader.MAX_RETRY) { isalive = false; icb.terminateWriters(); break; } } } icb.updateWriterCount(); try { hr.close(); } catch (Exception e) {} hr = null; } }
四、IRandomAccess接口:随机读取文件接口,BuffRandAcceURL类和BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件,这儿就不给出其源码了。
Java代码
package instream;
public interface IRandomAccess {
public int read() throws Exception;
public int read(byte b[]) throws Exception;
public int read(byte b[], int off, int len) throws Exception;
public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;
public void seek(long pos) throws Exception;
public long length();
public long getFilePointer();
public void close();
}
package instream; public interface IRandomAccess { public int read() throws Exception; public int read(byte b[]) throws Exception; public int read(byte b[], int off, int len) throws Exception; public int dump(int src_off, byte b[], int dst_off, int len) throws Exception; public void seek(long pos) throws Exception; public long length(); public long getFilePointer(); public void close(); }
五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。
关键是如何简单有效防止死锁?
Java代码
package instream;
import java.net.URL;
import java.net.URLDecoder;
import tag.ID3Tag;
/*
* FIFO方式共享环形缓冲区buf[]
* buf[]逻辑上分成16块, 每一块的长度UNIT_LENGTH=32K不小于最大帧长1732
*
* 同步: 写/写 -- 互斥等待空闲块
* 读/写 -- 并发访问buf[]
*
*/
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {
public static final int UNIT_LENGTH_BITS = 15;
public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS; //2^16=32K
public static final int BUF_LENGTH = UNIT_LENGTH << 4;
public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS; //16块
public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);
private static final int MAX_WRITER = 5;
private static long file_pointer;
private static int read_pos;
private static int fill_bytes;
private static byte[] buf; //同时作写线程同步锁
private static int[] unit_bytes; //同时作读线程互斥锁
private static int alloc_pos;
private static URL url;
private static boolean isalive = true;
private static int writer_count;
private static long file_length;
private static long frame_bytes;
private static int free_unit = UNIT_COUNT; // "信号量"计数器
public BuffRandAcceURL(String sURL) throws Exception {
this(sURL,MAX_WRITER);
}
public BuffRandAcceURL(String sURL, int download_threads) throws Exception {
buf = new byte[BUF_LENGTH];
unit_bytes = new int[UNIT_COUNT];
url = new URL(sURL);
// 打印文件名
try {
String s = URLDecoder.decode(sURL, "GBK");
System.out.println(s.substring(s.lastIndexOf("/") + 1));
} catch (Exception e) {
System.out.println(sURL);
}
// 获取文件长度
// 为何同一URL得到的文件长度有时对有时又不对?
frame_bytes = file_length = url.openConnection().getContentLength();
if (file_length == -1)
throw new Exception("ContentLength=-1");
// 创建线程以异步方式解析tag
new TagThread(url, file_length);
// 创建"写"线程
// 线程被创建后立即连接URL开始下载,由于服务器限制了同一IP每秒最大连接次数,频繁连接
// 会被服务器拒绝,因此延时.
writer_count = download_threads;
for (int i = 0; i < download_threads; i++) {
new Writer(this, url, buf, i + 1);
Thread.sleep(200);
}
// 缓冲
try_cache();
// 跳过 ID3 v2
ID3Tag tag = new ID3Tag();
int v2_size = tag.checkID3V2(buf, 0);
if (v2_size > 0) {
frame_bytes -= v2_size;
seek(v2_size);
}
tag = null;
}
/*
* 缓冲
*/
private void try_cache() throws InterruptedException {
int cache_size = BUF_LENGTH;
int bi = unit_bytes[read_pos >> UNIT_LENGTH_BITS];
if(bi != 0)
cache_size -= UNIT_LENGTH - bi;
while (fill_bytes < cache_size) {
if (writer_count == 0 || isalive == false)
return;
System.out.printf("/r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);
synchronized (unit_bytes) {
unit_bytes.wait(200); //wait(200)错过通知也可结束循环?
}
}
System.out.printf("/r");
}
private int try_reading(int i, int len) throws Exception {
int n = (i + 1) & (UNIT_COUNT - 1);
int r = (unit_bytes[i] > 0) ? (unit_bytes[i] + unit_bytes
) : unit_bytes[i];
if (r < len) {
if (writer_count == 0 || isalive == false)
return r;
try_cache();
}
return len;
}
/*
* 各个"写"线程互斥等待空闲块
* 空闲块按由小到大的顺序分配;管理空闲块采用类似于C++的信号量机制.
*/
public int tryWriting() throws InterruptedException {
int ret = -1;
synchronized (buf) {
while (free_unit == 0 && isalive)
buf.wait();
if(alloc_pos >= file_length || isalive == false)
return -1;
ret = alloc_pos;
alloc_pos += UNIT_LENGTH;
free_unit--;
}
return ret;
}
/*
* "写"线程向buf[]写完数据后调用,通知"读"线程
*/
public void updateBuffer(int i, int len) {
synchronized (unit_bytes) {
unit_bytes[i] = len;
fill_bytes += len;
unit_bytes.notify();
}
}
/*
* "写"线程准备退出时调用
*/
public void updateWriterCount() {
synchronized (unit_bytes) {
writer_count--;
unit_bytes.notify();
}
}
public int getWriterCount() {
return writer_count;
}
/*
* read方法内调用
*/
public void notifyWriter() {
synchronized (buf) {
buf.notifyAll();
}
}
/*
* 被某个"写"线程调用,用于终止其它"写"线程;isalive也影响"读"线程流程
*/
public void terminateWriters() {
synchronized (unit_bytes) {
if (isalive) {
isalive = false;
System.out.println("/n读取文件超时。重试 " + HttpReader.MAX_RETRY
+ " 次后放弃,请您稍后再试。");
}
unit_bytes.notify();
}
notifyWriter();
}
public int read() throws Exception {
//...
}
public int read(byte b[]) throws Exception {
return read(b, 0, b.length);
}
public int read(byte[] b, int off, int len) throws Exception {
int i = read_pos >> UNIT_LENGTH_BITS;
// 1.等待有足够内容可读
if(try_reading(i, len) < len || isalive == false)
return -1;
// 2.读取
int tail = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH
if (tail < len) {
System.arraycopy(buf, read_pos, b, off, tail);
System.arraycopy(buf, 0, b, off + tail, len - tail);
} else
System.arraycopy(buf, read_pos, b, off, len);
fill_bytes -= len;
file_pointer += len;
read_pos += len;
read_pos &= BUF_LENGTH_MASK;
unit_bytes[i] -= len;
if (unit_bytes[i] < 0) {
int ni = read_pos >> UNIT_LENGTH_BITS;
unit_bytes[ni] += unit_bytes[i];
unit_bytes[i] = 0;
free_unit++;
notifyWriter();
} else if (unit_bytes[i] == 0) {
free_unit++; // 空闲块"信号量"计数加1
notifyWriter(); // 3.通知
}
// 如果下一块未填满,意味着文件读完,第1步已处理一次读空两块的情况
return len;
}
/*
* 从buf[]偏移src_off位置复制,不移动文件"指针",不发信号.
*/
public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {
//...
}
public long length() {
//...
}
public long getFilePointer() {
//...
}
public void close() {
//...
}
/*
* 随机读取定位
*/
public void seek(long pos) throws Exception {
//...
}
}
相关文章推荐
- java实现远程文件的读取
- JAVA实现远程文件读取!
- java实现远程储存读取文件
- JAVA实现远程文件读取!
- java 读取远程文件 实现继传。。。
- Http协议+java httpURLConnetion实现远程读取文件+chrome DEBUG Timing
- JAVA实现远程文件读取
- Java远程实现Linux文件内容读取.
- java实现远程连接服务器,监控某个目录下的日志文件
- 用java和JavaScript读取一个JS文件中字符个数的实现
- 用java代码实现从excel表格读取数据然后写入生成国际化配置文件properties
- Java实现读取文件到TextArea中
- Java读取文件 利用MappedByteBuffer进行缓冲
- Java实战(一)—Dom4J读取配置文件实现抽象工厂+反射
- java实现附件预览(openoffice+swftools+flexpaper)(解决jsp读取全盘文件问题)
- Java读取远程文件
- JAVA远程连接linux服务器读取文件(JSch)
- java简单多线程方式+实现文件上传(spring mvc + jquery.form.js 框架)
- Java I/O流之BufferedReader实现读取文件
- Java 实现按行读取文件并且将行中的重复数据删除