您的位置:首页 > Web前端

(三)BufferedInputStream源码学习

2017-11-29 15:41 381 查看
BufferedInputStream是一个组合流,主要对其他输入流提供缓存功能,所以构造函数中都需要传入一个InputStream的实例。原理就是从底层输入流中一次性读取一定长度的字节存储到缓存数组中,减少底层输入流调用I/O资源。

源码分析

构造函数

从仅有的两个构造函数可以知道它需要一个InputStream引用,并且可以设置缓存区大小(默认大小8192)

private static int defaultBufferSize = 8192;

//缓存数组
protected volatile byte buf[];

public BufferedInputStream(InputStream in) {
this(in, defaultBufferSize);
}

//允许自定义缓存区大小
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}


read函数分析

//读取单字节
public synchronized int read() throws IOException {
if (pos >= count) {//若缓存区内容已全部读完,则重新填充缓存区
fill();
if (pos >= count)//底层输出流数据已全部读完
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}


//一次性读取一个字节数组
public synchronized int read(byte b[], int off, int len)
throws IOException
{
getBufIfOpen(); // Check for closed stream
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

//循环读取直到读取到规定长度数据或剩余的所有数据(长度不足len)
int n = 0;
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
// if not closed but no bytes available, return
InputStream input = in;
if (input != null && input.available() <= 0)
return n;
}
}

//按指定长度和偏移位置读取一个字节数组(有多少读多少)
private int read1(byte[] b, int off, int len) throws IOException {
int avail = count - pos;//缓存区未读数据长度
if (avail <= 0) {//数据都读完了,要重新填充缓存区
//英文注释解释的很清楚了:未指定markPos情况下
//如果一次性读取的长度比缓存区长度大,则
//直接调用底层输入流read()函数返回。
//避免数据复制带来的效率问题(底层输入流->缓存区buf->指定数组b)
if (len >= getBufIfOpen().length && markpos < 0) {
return getInIfOpen().read(b, off, len);
}
fill();

avail = count - pos;
//判断底层输入流数据是否全部读完
if (avail <= 0) return -1;
}
//用户实际读取长度
int cnt = (avail < len) ? avail : len;
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
pos += cnt;
return cnt;
}


重点函数fill分析

private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0;            /* no mark: throw away the buffer */
else if (pos >= buffer.length)  /* no room left in buffer */
if (markpos > 0) {  /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1;   /* buffer got too big, invalidate mark */
pos = 0;        /* drop buffer contents */
} else {            /* grow buffer */
int nsz = pos * 2;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}

//源码函数为了简洁的原因省略了一些无效的判断语句,为了可读性我画蛇添足一下

private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)//没有复位标记,很简单,直接填充
pos = 0;            /* no mark: throw away the buffer */
else if(markpos >= 0){//markpos > 0 和 markpos == 0 是两种不同情况的
//这边要记住fill()被调用一定是因为 pos >= count 的原因
//if(pos < buffer.length)这种情况不做特别处理,什么意思?
//表明从buf[pos]-buf[buffer.length-1]这一段空间没有存放过数据的。
//缓存区有位置存放新数据,当然不做特殊处理了

if (pos >= buffer.length){//缓存区已经没位置放新数据啦
if (markpos > 0) {//将buf[markpos]-buf[pos]之间的数据挪到以buf头部
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if( markpos == 0){
//markpos==0有什么含义?
if (buffer.length >= marklimit) {
//复位标识失效,pos >= buffer.length >= marklimit  ==》 pos >= marklimit
//根据参数说明可以知道复位设置失效了

markpos = -1;   /* buffer got too big, invalidate mark */
pos = 0;        /* drop buffer contents */
} else {
//需要保留的数据长度比缓存区大,需要进行缓存区扩容
int nsz = pos * 2;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {

throw new IOException("Stream closed");
}
buffer = nbuf;
}
}

}

}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}


标记和复位

//标记一个重读位置
public synchronized void mark(int readlimit) {
marklimit = readlimit;
markpos = pos;
}

//复位到markpos位置重新读取
public synchronized void reset() throws IOException {
getBufIfOpen(); // Cause exception if closed
if (markpos < 0)
throw new IOException("Resetting to invalid mark");
pos = markpos;
}

public boolean markSupported() {
return true;
}


如果设置mark(limit)以后读取的字节数超过limit,markpos一定失效吗?

:虽然文档上是这样说的,但实际上这与缓存区大小有关。若设置mark(limit)以后,读取的字节长度没有超过缓存区的长度,仍然可以复位重读。以上说法还不够全面,还有一个例外,若在没有调用过read()函数的情况下调用mark(limit)(markpos == 0)读取的字节长度超过markpos,是失效

完整源码

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

public
class BufferedInputStream extends FilterInputStream {

private static int defaultBufferSize = 8192;

//缓存数组
protected volatile byte buf[];

//保证对缓存区操作的原子性
private static final
AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater
(BufferedInputStream.class,  byte[].class, "buf");

//缓存长度;与buf.length区别,缓存区不一定充满
protected int count;

//读取下标,标记下一个未读的数组下标
protected int pos;

//标记下标,用于重读。-1表示失效
protected int markpos = -1;

//重读的最大长度
protected int marklimit;

//获取底层输入流
private InputStream getInIfOpen() throws IOException {
InputStream input = in;
if (input == null)
throw new IOException("Stream closed");
return input;
}

//获取缓存区数组
private byte[] getBufIfOpen() throws IOException {
byte[] buffer = buf;
if (buffer == null)
throw new IOException("Stream closed");
return buffer;
}

//构造函数必须以一个InputStream实例作为入参

public BufferedInputStream(InputStream in) {
this(in, defaultBufferSize);
}

//允许自定义缓存区大小
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}

//从底层输入流读取数据到缓存区
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0;            /* no mark: throw away the buffer */
else if (pos >= buffer.length)  /* no room left in buffer */
if (markpos > 0) {  /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1;   /* buffer got too big, invalidate mark */
pos = 0;        /* drop buffer contents */
} else {            /* grow buffer */
int nsz = pos * 2;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}

//读取一个字节
public synchronized int read() throws IOException {
if (pos >= count) {若缓存区内容已全部读完,则重新填充缓存区
fill();
if (pos >= count)//底层输出流数据已全部读完
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}

//按指定长度和偏移位置读取一个字节数组(有多少读多少)
private int read1(byte[] b, int off, int len) throws IOException {
int avail = count - pos;//缓存区未读数据长度
if (avail <= 0) {//数据都读完了,要重新填充缓存区
/* If the requested length is at least as large as the buffer, and
if there is no mark/reset activity, do not bother to copy the
bytes into the local buffer.  In this way buffered streams will
cascade harmlessly. */
//英文注释解释的很清楚了:未指定markPos情况下,如果一次性读取的长度比缓存区长度大,则
//直接调用底层输入流read()函数返回。避免数据复制带来的效率问题(底层输入流->缓存区buf->指定数组b)
if (len >= getBufIfOpen().length && markpos < 0) {
return getInIfOpen().read(b, off, len);
}
fill();

avail = count - pos;
//判断底层输入流数据是否全部读完
if (avail <= 0) return -1;
}
//用户实际读取长度
int cnt = (avail < len) ? avail : len;
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
pos += cnt;
return cnt;
}

//外部调用接口,一次性读取一个字节数组
public synchronized int read(byte b[], int off, int len)
throws IOException
{
getBufIfOpen(); // Check for closed stream
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

//循环读取直到读取到规定长度数据或剩余的所有数据(长度不足len)
int n = 0;
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
// if not closed but no bytes available, return
InputStream input = in;
if (input != null && input.available() <= 0)
return n;
}
}

//跳过n个字节
public synchronized long skip(long n) throws IOException {
getBufIfOpen(); // Check for closed stream
if (n <= 0) {
return 0;
}
long avail = count - pos;

if (avail <= 0) {
// If no mark position set then don't keep in buffer
if (markpos <0)
return getInIfOpen().skip(n);

// Fill in buffer to save bytes for reset
fill();
avail = count - pos;
if (avail <= 0)
return 0;
}

long skipped = (avail < n) ? avail : n;
pos += skipped;
return skipped;
}

//还有多少可读数据
public synchronized int available() throws IOException {
int n = count - pos;
int avail = getInIfOpen().available();
return n > (Integer.MAX_VALUE - avail)
? Integer.MAX_VALUE
: n + avail;
}

//标记一个重读位置
public synchronized void mark(int readlimit) {
marklimit = readlimit;
markpos = pos;
}

//复位到markpos位置重新读取
public synchronized void reset() throws IOException {
getBufIfOpen(); // Cause exception if closed
if (markpos < 0)
throw new IOException("Resetting to invalid mark");
pos = markpos;
}

public boolean markSupported() {
return true;
}

public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
if (bufUpdater.compareAndSet(this, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: