您的位置:首页 > Web前端

Java IO系列5 字节流之BufferedInputStream

2015-12-25 17:23 465 查看

BufferedInputStream

BufferedInputStream 是一个带有内存缓冲的 InputStream.使普通的文件输入流具备了内存缓存的功能,通过内存缓冲减少磁盘io次数。

public
class BufferedInputStream extends FilterInputStream {

//该变量定义了默认的缓冲大小  2^13 = 2^10*2^3 = 8K
private static int defaultBufferSize = 8192;
//缓冲数组,注意该成员变量同样使用了volatile关键字进行修饰,作用为在多线程环境中,当对该变量引用进行修改时保证了内存的可见性。
protected volatile byte buf[];

//    AtomicReferenceFieldUpdater是一个抽象类,但该类的内部已经给出了包访问控制级别的一个实现AtomicReferenceFieldUpdaterImpl,
//    原理是利用反射将一个 被声明成volatile 的属性通过JNI调用,使用cpu指令级的命令将一个变量进行更新,保障该操作是原子的。也就是通过上面定义
//    的bufUpdater将buf这个byte数组的跟新变为原子操作,其作用是保障其原子更新。

//    BufferedInputStream源代码中总共有两个地方用到了这个bufUpdater,一个是我们上面看到的close方法中,另外一个是再前面说道的fill()方法中。
//    既然BufferedInputStream的所有操作上都用了synchronized来做同步,那为什么这里还需要用这个原子更新器呢?带着问题上面提到过fill()方法中
//    的最后一个步骤:当有mark,而且markLimit的长度又大于初始数组的长度时,需要对内存数组扩容,即创建一个尺寸更大的数组,将原来数组中的数据拷贝到
//    新数组中,再将指向原数组的应用指向新的数组。bufUpdater正是用在了将原数组引用指向新数组的操作上,同样close的方法使用的bufUpdater也是用在
//    对数组引用的改变上,这样看来就比较清晰了,主要是为了防止一个线程在执行close方法时,将buffer赋值为null这个时候另外一个线程正在执行fill()方
//    法的最后一个步骤又将buffer赋值给了一个新的数组,从而导致资源没有释放掉。
private static final
AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater
(BufferedInputStream.class,  byte[].class, "buf");

//该成员变量表示目前缓冲区域中有多少有效的字节。
protected int count;

//该成员变量表示了当前缓冲区的读取位置。
protected int pos;

//表示标记位置,该标记位置的作用为:实现流的标记特性,即流的某个位置可以被设置为标记,允许通过设置reset(),将流的读取位置进行重置到该标记位置,
//    但是InputStream注释上明确表示,该流不会无限的保证标记长度可以无限延长,即markpos=15,pos=139734,该保留区间可能已经超过了保留的极限
protected int markpos = -1;

//该成员变量表示了上面提到的标记最大保留区间大小,当pos-markpos> marklimit时,mark标记可能会被清除(根据实现确定)
protected int marklimit;

private InputStream getInIfOpen() throws IOException {
InputStream input = in;
if (input == null)
throw new IOException("Stream closed");
return input;
}

private byte[] getBufIfOpen() throws IOException {
byte[] buffer = buf;
if (buffer == null)
throw new IOException("Stream closed");
return buffer;
}

public BufferedInputStream(InputStream in) {
this(in, defaultBufferSize);
}

public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}

private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0;            /* no mark: throw away the buffer */
else if (pos >= buffer.length)  /* no room left in buffer */
if (markpos > 0) {  /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1;   /* buffer got too big, invalidate mark */
pos = 0;        /* drop buffer contents */
} else {            /* grow buffer */
int nsz = pos * 2;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}

public synchronized int read() throws IOException {
if (pos >= count) {
fill();
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}

private int read1(byte[] b, int off, int len) throws IOException {
int avail = count - pos;
if (avail <= 0) {
/* If the requested length is at least as large as the buffer, and
if there is no mark/reset activity, do not bother to copy the
bytes into the local buffer.  In this way buffered streams will
cascade harmlessly. */
if (len >= getBufIfOpen().length && markpos < 0) {
return getInIfOpen().read(b, off, len);
}
fill();
avail = count - pos;
if (avail <= 0) return -1;
}
int cnt = (avail < len) ? avail : len;
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
pos += cnt;
return cnt;
}

public synchronized int read(byte b[], int off, int len)
throws IOException
{
getBufIfOpen(); // Check for closed stream
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

int n = 0;
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
// if not closed but no bytes available, return
InputStream input = in;
if (input != null && input.available() <= 0)
return n;
}
}

public synchronized long skip(long n) throws IOException {
getBufIfOpen(); // Check for closed stream
if (n <= 0) {
return 0;
}
long avail = count - pos;

if (avail <= 0) {
// If no mark position set then don't keep in buffer
if (markpos <0)
return getInIfOpen().skip(n);

// Fill in buffer to save bytes for reset
fill();
avail = count - pos;
if (avail <= 0)
return 0;
}

long skipped = (avail < n) ? avail : n;
pos += skipped;
return skipped;
}

public synchronized int available() throws IOException {
int n = count - pos;
int avail = getInIfOpen().available();
return n > (Integer.MAX_VALUE - avail)
? Integer.MAX_VALUE
: n + avail;
}

public synchronized void mark(int readlimit) {
marklimit = readlimit;
markpos = pos;
}

public synchronized void reset() throws IOException {
getBufIfOpen(); // Cause exception if closed
if (markpos < 0)
throw new IOException("Resetting to invalid mark");
pos = markpos;
}

public boolean markSupported() {
return true;
}

public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
if (bufUpdater.compareAndSet(this, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}
}
}


fill()方法有很大一段是关于markpos的处理,其处理过程大致如下图:

a.没有markpos的情况很简单:



b.有mark的情况比较复杂:



返回值中还有一个细节是getBufIfOpen()[pos++],直接将pos++来获取下一个未读取的数据,这里涉及到的两个元素:一个内存数组,一个当前读取的数据下标都是全局变量,pos++也不是线程安全。那么BufferedInputStream如何保证对内存缓冲数组的操作线程安全?源码中有操作的public方法除了close方法之外,其他方法上都加上了synchronized关键字,以保障上面描述的整个内存缓存数组的操作是线程安全的。但为什么close方法没有synchronized,我们看这个方法做了些什么事情:

byte[] buffer;
while ( (buffer = buf) != null) {
if (bufUpdater.compareAndSet(this, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}


简单来看做了两个操作:把内存数组置为null,将引用的inputStream置为null,同时将引用的inputStream.close();

这两个操作的核心都是关闭原始流,释放资源,如果加了synchronized关键字,会导致当前线程正在执行read方法,而且系统消耗很大时,想释放资源无法释放。此时read方法还没执行完,我们知道synchronized的锁是加在整个对象上的,所以close方法就必须等到read结束后才能执行,这样很明显不能满足close的需求,甚至会导致大量的io资源被阻塞不能关闭。

但该方法用一个while循环,而且只有当bufUpdater.compareAndSet(this, buffer, null)成功时,才执行上述的资源释放。

参考:/article/3886869.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: