[重学Java基础][Java IO流][Part.12] [Part.12]缓冲字节输入输出流
===
BufferedInputStream
概述
BufferedInputStream继承于FilterInputStream,
FilterInputStream 从名字上就可以看出 是个过滤流 类似于FilterReader
用来“封装其它的输入流,并为它们提供额外的功能”。
BufferedInputStream的作用就是为“输入流提供缓冲功能,允许每次读入一批数据 并且提供了按行读取功能 通过包装InputStream对象来发挥作用 很明显 这是一个处理流 包装流 一般包装ByteArrayInputStream,system.in对象
源码解析
成员函数
默认的缓冲大小 为8192
private static int DEFAULT_BUFFER_SIZE = 8192;
最大的缓冲大小 为Interger.MaxValue
private static int MAX_BUFFER_SIZE = 2147483639;
缓冲字节数组 是多线程内存可见的
protected volatile byte[] buf;
缓存数组的原子化更新器
这个是和缓冲字节数组byte[] buf配合使用的
以保证缓冲字节数组的原子化更新
也就是说在多线程环境下buf和bufUpdater都具有原子性
private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]>
bufUpdater = AtomicReferenceFieldUpdater
.newUpdater(BufferedInputStream.class, byte[].class, "buf");
此流的缓冲区的有效字节数
protected int count;
此流的缓冲区读入游标位置
protected int pos;
此流的的缓冲区的标记位置
markpos和reset()配合使用才有意义。操作步骤:
调用mark()方法,保存pos的值到markpos中。
通过reset()方法,会将pos的值重置为markpos。
接着通过read()读取数据时,就会从mark()保存的位置开始读取。
protected int markpos;
标记的最大值
protected int marklimit;
成员方法
构造方法
输入一个节点数据源输入流 按默认缓冲区大小构造一个BufferedInputStream
public BufferedInputStream(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}
按指定的缓冲区大小构造BufferedInputStream
public BufferedInputStream(InputStream in, int size) {
super(in);
this.markpos = -1;
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
} else {
this.buf = new byte[size];
}
}
获取输入流 缓冲区数据
获取被包装的输入流InputStream input
private InputStream getInIfOpen() throws IOException {
InputStream input = this.in;
if (input == null) {
throw new IOException("Stream closed");
} else {
return input;
}
}
直接获取缓冲区数据 注意是引用传递而不是复制一个新的缓冲数组
private byte[] getBufIfOpen() throws IOException {
byte[] buffer = this.buf;
if (buffer == null) {
throw new IOException("Stream closed");
} else {
return buffer;
}
}
缓冲数组填充方法
这个方法比较复杂 是BufferedInputStream的核心方法
建立缓冲并从被包装的数据源输入流读入就是通过此方法实现的
后面详细解释
private void fill() throws IOException {
…………
}
读入方法
读入下一个字节
public synchronized int read() throws IOException {
if (pos >= count) {
如果下一个读入位置的游标超过了此流的有效字节数
执行缓冲数组填充方法fill() 从被包装的输入流中读入数据到缓冲区
fill();
缓冲区填充后 仍然无新数据 则说明已读完 返回-1
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}
读入下一个字节并写入到指定字节数组byte b[]
off为字节数组b的写入起始位置 len为读入的长度
public synchronized int read(byte b[], int off, int len)
throws IOException
{
检测流是否关闭
getBufIfOpen();
检测是否越界
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int n = 0;
for (;;) {
可以看到这里是用内部方法read1()进行读取的
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
读入了指定的长度 返回
if (n >= len)
return n;
如果被包装的输入流未关闭但是无字节数据 则返回
InputStream input = in;
if (input != null && input.available() <= 0)
return n;
}
}
private int read1(byte[] b, int off, int len) throws IOException {
int avail = this.count - this.pos;
这里使用了一个直接从被包装的输入流读取的机制 如果缓冲区已经读完 且没有进行标记
则直接从被包装的输入流中读入 避免了从被包装的输入流中读入字节到缓冲区
再从缓冲区复制到字节数组b的性能损失过程
这是一个加速机制
if (avail <= 0) {
if (len >= this.getBufIfOpen().length && this.markpos < 0) {
return this.getInIfOpen().read(b, off, len);
}
刷新缓冲区
this.fill();
此时仍无数据 则返回-1 读入结束
avail = this.count - this.pos;
if (avail <= 0) {
return -1;
}
}
如果缓冲区未读完 则使用System.arraycopy从缓冲区复制数据到byte[] b
int cnt = avail < len ? avail : len;
System.arraycopy(this.getBufIfOpen(), this.pos, b, off, cnt);
this.pos += cnt;
return cnt;
}
跳过方法
InputStream的通用方法
public synchronized long skip(long n) throws IOException {
检查流是否关闭
getBufIfOpen();
if (n <= 0) {
return 0;
}
long avail = count - pos;
if (avail <= 0) {
如果缓冲区已经读完 且没有进行标记
则直接让被包装的被包装的输入流跳过n个字节
if (markpos <0)
return getInIfOpen().skip(n);
刷新流缓冲体
fill();
avail = count - pos;
此时无数据则说明已读完 直接返回0
if (avail <= 0)
return 0;
}
缓冲区未读完 则直接跳过n个字节(实际是通过移动读入游标pos来实现的)
long skipped = (avail < n) ? avail : n;
pos += skipped;
return skipped;
}
下一个字节是否可以读入方法
public synchronized int available() throws IOException {
int n = count - pos;
实际上调用的是被包装的输入流的available()方法
int avail = getInIfOpen().available();
return n > (Integer.MAX_VALUE - avail)
? Integer.MAX_VALUE
: n + avail;
}
标记与重置
标记方法 入参是读入的限制 标记则直接标记为当前游标的位置
public synchronized void mark(int readlimit) {
this.marklimit = readlimit;
this.markpos = this.pos;
}
重置此流到标记的位置
public synchronized void reset() throws IOException {
this.getBufIfOpen();
if (this.markpos < 0) {
throw new IOException("Resetting to invalid mark");
} else {
实际就是移动读入游标到标记位置
this.pos = this.markpos;
}
}
是否支持标记 恒定为支持
public boolean markSupported() {
return true;
}
关闭流
public void close() throws IOException {
while(true) {
byte[] buffer = this.buf;
如果缓冲区不为空的话 则循环执行bufUpdater的CAS操作
直到buffer被置空(这块就是检查看是否有其他线程还在操作缓冲区)
if (this.buf != null) {
if (!bufUpdater.compareAndSet(this, buffer, (Object)null)) {
continue;
}
InputStream input = this.in;
this.in = null;
if (input != null) {
input.close();
}
return;
}
return;
}
}
缓冲数组刷新内容 fill()方法
创建BufferedInputStream时,调用构造函数并传入一个来自数据源的输入流参数,读取数据时,BufferedInputStream会将该输入流数据分批读取,每次读取一部分到缓冲中;操作完缓冲中的这部分数据之后,再从输入流中读取下一部分的数据。
因为把数据源输入流的数据读入到BufferedInputStream的缓冲区中 所以此流叫做缓冲字节输入流
使用缓冲区的原因是为了提高读入的效率 缓冲区的数据时存储在内存中的 而被包装的数据源输入流的数据可能是在外存或者网络接口中 相比内存 数据源输入流读入数据的速度可能较慢
至于为什么不一次性全部读入到内存中?如果数据源数据较多 一次读入可能会耗时很久 并且耗费大量内存空间
但如果数据源数据很少 则可以一次读入到内存中 要根据情况决定
fill()方法源码
private void fill() throws IOException {
byte[] buffer = this.getBufIfOpen();
int nsz;
if (this.markpos < 0) {
this.pos = 0;
} else if (this.pos >= buffer.length) {
if (this.markpos > 0) {
nsz = this.pos - this.markpos;
System.arraycopy(buffer, this.markpos, buffer, 0, nsz);
this.pos = nsz;
this.markpos = 0;
} else if (buffer.length >= this.marklimit) {
this.markpos = -1;
this.pos = 0;
} else {
if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
}
nsz = this.pos <= MAX_BUFFER_SIZE - this.pos ? this.pos * 2 : MAX_BUFFER_SIZE;
if (nsz > this.marklimit) {
nsz = this.marklimit;
}
byte[] nbuf = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, this.pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
throw new IOException("Stream closed");
}
buffer = nbuf;
}
}
this.count = this.pos;
nsz = this.getInIfOpen().read(buffer, this.pos, buffer.length - this.pos);
if (nsz > 0) {
this.count = nsz + this.pos;
}
}
-情景1 如果缓冲区中的数据已经被全部读入 且没有进行标记 此时 fill方法相当于
private void fill() throws IOException {
byte[] buffer = this.getBufIfOpen();
int nsz;
this.pos = 0;
this.count = this.pos;
nsz = this.getInIfOpen().read(buffer, this.pos, buffer.length - this.pos);
if (nsz > 0) {
this.count = nsz + this.pos;
}
此情景下 要读入的数据被从数据源输入流复制到流缓冲区 并更新有效数据字符数count
程序运行的情况是 数据源输入流中有较长的数据,程序每次从中读取一部分数据到缓冲区buffer中进行操作。
每次当我们读取完buffer中的数据之后,并且此时输入流没有被标记;
那么,就接着从输入流中读取下一部分的数据到buffer中。
其中,判断是否读完buffer中的数据,是通过 if (pos >= count) 来判断的;
判断输入流有没有被标记,是通过 if (markpos < 0) 来判断的。
理解这个思想之后,我们再对这种情况下的fill()的代码进行分析,就特别容易理解了。
private void fill() throws IOException {
byte[] buffer = this.getBufIfOpen();
int nsz;
判断输入流是否被标记
if (this.markpos < 0) {
未被标记 markpos =-1<0
this.pos = 0;
} else if (this.pos >= buffer.length) {
已经被标记了
……
}
this.count = this.pos;
获取输入流并复制到缓冲区中 读入buffer.length - this.pos个字节 因为此时未被标记
所以就是读入buffer.length个字节
nsz = this.getInIfOpen().read(buffer, this.pos, buffer.length - this.pos);
if (nsz > 0) {
根据从输入流中读取的实际数据的多少,来更新buffer中数据的实际大小
this.count = nsz + this.pos;
}
}
-情景2 缓冲区数据已读完 但是进行了标记
此时 fill方法相当于
private void fill() throws IOException {
byte[] buffer = this.getBufIfOpen();
if (markpos >= 0 && pos >= buffer.length) {
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
此时程序运行的情况是 — — 数据源输入流中有较长的数据,程序每次从中读取一部分数据到缓冲区buffer中进行操作。
当读取完buffer中的数据之后,并且此时输入流存在标记时;
那么,就发生情景2。
此时,需要先将“被标记位置”到“buffer末尾”的数据保存起来,然后再从输入流中读取下一部分的数据到buffer中。
其中,判断是否读完buffer中的数据,是通过 if (pos >= count) 来判断的;
判断输入流有没有被标记,是通过 if (markpos < 0) 来判断的。
判断buffer是否已经被用完,是通过 if (pos >= buffer.length) 来判断的。
理解这个思想之后,我们再对这种情况下的fill()代码进行分析,就特别容易理解了。
注意:情况2进行fill缓冲区刷新后,markpos的值由“大于0”变成了“等于0”!
private void fill() throws IOException {
byte[] buffer = this.getBufIfOpen();
int nsz;
if (this.markpos < 0) {
……
}
下一个读入游标位置超过了buffer长度 说明buffer空间已经用完
else if (this.pos >= buffer.length) {
并且进行了标记
if (this.markpos > 0) {
标定当前读入位置到标记位置的长度nsz
nsz = this.pos - this.markpos;
将buffer中标记位置开始的nsz长度的数据复制到buffer从0位置开始的部分
System.arraycopy(buffer, this.markpos, buffer, 0, nsz);
下一个读入数据游标移动到nsz位置
this.pos = nsz;
标记游标移动到buffer头部
this.markpos = 0;
}
有效数据长度被置为pos的位置 也即使nsz长度的位置
this.count = this.pos;
继续从数据源输入流中读入数据
读入长度为缓冲区减掉标记的长度剩余的空间buffer.length - this.pos
然后复制到buffer中从pos开始的空间
nsz = this.getInIfOpen().read(buffer, this.pos, buffer.length - this.pos);
if (nsz > 0) {
更新count大小数据
this.count = nsz + this.pos;
}
}
- 情况3 读取完buffer中的数据,进行了标记 标记位置为0 但buffer长度超过了标记的限制
此时代码相当于
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos >= 0 && pos >= buffer.length) {
if ( (markpos <= 0) && (buffer.length >= marklimit) ) {
缓冲区buffer长度过大 超过了标记限制marklimit
则标记无效 markpos被置为未标记的-1
markpos = -1;
读入游标置回0
pos = 0;
}
}
count = pos;
从数据源输入流中读入数据并复制到缓冲区buffer中
复制长度为buffer.length - pos 实际上就是buffer.length
int nsz= getInIfOpen().read(buffer, pos, buffer.length - pos);
if (nsz> 0){
count = n + pos;
}
}
- 情况4 读取完buffer中的数据,进行了标记,标记位置为0 并且buffer长度没有超过标记的限制
此时代码等于
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
已经进行了标记 标记位置为0 并且 缓冲区已经读完
if (this.pos >= buffer.length) {
if (this.markpos > 0) {
……
}
else {
如果缓冲区长度超过了缓冲区允许的最大值 抛出内存耗尽异常
if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
}
下面要用的数组长度nsz 其的大小是“pos*2”和“marklimit”中较小的那个数
这其实是一个扩容操作 因为标记的位置是0 所以要把缓冲区中所有有效数据都保存起来
所以缓冲区要进行扩容 既可以容纳新读入的数据 又要保存之前的数据以预备重置读入游标
int nsz = this.pos <= MAX_BUFFER_SIZE - this.pos ? this.pos * 2 : MAX_BUFFER_SIZE;
if (nsz > this.marklimit) {
nsz = this.marklimit;
}
新建新缓冲数组nbuf 长度为nsz
byte[] nbuf = new byte[nsz];
将数据从旧的缓冲数组buffer中复制到新的数组nbuf 中
System.arraycopy(buffer, 0, nbuf, 0, this.pos);
CAS比较和替换 检查buffer是否是期望值
作用是多线程下检查流是否关闭
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
throw new IOException("Stream closed");
}
buffer替换为nbuf
buffer = nbuf;
}
}
count = pos;
从数据源输入流读入新数据
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
注意:在这里,我们思考一个问题,“为什么需要marklimit,它的存在到底有什么意义?”我们结合“情况2”、“情况3”、“情况4”的情况来分析。
假设,marklimit是无限大的,而且我们设置了markpos。当我们从输入流中每读完一部分数据并读取下一部分数据时,都需要保存markpos所标记的数据;这就意味着,我们需要不断执行情况4中的操作,要将buffer的容量扩大……随着读取次数的增多,buffer会越来越大;这会导致我们占据的内存越来越大。所以,我们需要给出一个marklimit;当buffer>=marklimit时,就不再保存markpos的值了。
情况5:除了上面4种情况之外的情况
代码等于
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
直接从输入流读取部分新数据到buffer中
BufferedOutputStream
概述
BufferedOutputStream继承于FilterOutputStream,
FilterOutputStream 从名字上就可以看出 是个过滤流 类似于FilterWriter
用来“封装其它的输出流,并为它们提供额外的功能”。
BufferedOutputStream的作用就是为“输出流提供缓冲功能,允许每次写入一批数据到底层输出流中
而不是每次都调用底层节点流操作每一个字节
源码解析
成员函数
字节缓冲数组 byte[] buf
protected byte[] buf;
有效数据大小
protected int count;
成员方法
构造方法
传入一个节点输入流 创建一个默认缓冲大小的BufferedOutputStream
public BufferedOutputStream(OutputStream out) {
this(out, 8192);
}
创建指定缓冲大小的BufferedOutputStream
public BufferedOutputStream(OutputStream out, int size) {
super(out);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
} else {
this.buf = new byte[size];
}
}
刷新并写入底层流方法
private void flushBuffer() throws IOException {
if (this.count > 0) {
调用write方法 缓冲区数据全部写入底层输出流
this.out.write(this.buf, 0, this.count);
this.count = 0;
}
}
写出方法
写出方法 参数为整型 写出的时候被转换
public synchronized void write(int b) throws IOException {
如果有效数据较多 则先写出到底层输出流
if (this.count >= this.buf.length) {
this.flushBuffer();
}
this.buf[this.count++] = (byte)b;
}
public synchronized void write(byte[] b, int off, int len) throws IOException {
若写入长度大于缓冲区大小,则先将缓冲中的数据写入到输出流,然后直接将数组b写入到底层输出流中
if (len >= this.buf.length) {
this.flushBuffer();
this.out.write(b, off, len);
}
若剩余的缓冲空间 不足以 存储即将写入的数据,则先将缓冲中的数据写入到底层输出流中
然后将写入数据存储到缓冲数组中
else {
if (len > this.buf.length - this.count) {
this.flushBuffer();
}
System.arraycopy(b, off, this.buf, this.count, len);
this.count += len;
}
}
刷新方法 执行写入操作
public synchronized void flush() throws IOException {
this.flushBuffer();
this.out.flush();
}