Okio 首次接触是在 OkHttp 中,它作为 OkHttp 的底层io库而出名。它是对 IO 流进行了封装,进行优化;Okio可以单独使用,其效率比 IO 要高,举个写入和读取的例子,往一个文本中写入数据和从文本中读取数据
IO 流:
public static void writeTest(File file) {
try {
FileOutputStream fos = new FileOutputStream(file);
OutputStream os = new BufferedOutputStream(fos);
DataOutputStream dos = new DataOutputStream(os);
dos.writeUTF("write string by utf-8.\n");
dos.writeInt(1234);
dos.flush();
fos.close();
} catch (Exception e) {
e.printStackTrace();
}
}
Okio:
public static void writeTest1(File file) {
Okio
.buffer(Okio.sink(file))
.writeUtf8("abcdefghijklmn \n")
.writeUtf8("中国 \n")
.writeUtf8("1234")
.close();
} catch (Exception e) {
e.printStackTrace();
}
}
读取文本内容,打印
public static void readLines(File file) {
try {
BufferedSource bufferedSource = Okio.buffer(Okio.source(file));
String line;
while ((line = bufferedSource.readUtf8Line()) != null){
System.out.println(line);
}
bufferedSource.close();
} catch (Exception e) {
e.printStackTrace();
}
}
以上方法写的比较粗糙,由于 IO 流不是重点,所以只写了一个方法作为例子。我们知道直接使用 FileOutputStream 流的话,因为读写都是一个字节,所以效率比较低,为了提高效率,java 提供了 BufferedOutputStream 缓冲流,这样就可以缓冲一批数据,然后一起写入文本中;IO 流模块采用了装饰模式,我们可以看到,写出来后代码相对雍容,同样的功能用 Okio 来实现的话,一行代码就可以了;上面的例子是为了更好的理解所以才用多行来展示。
先来看看 Okio.sink(file) 返回的是什么,
public static Sink sink(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return sink(new FileOutputStream(file));
}
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
return new Sink() {
@Override // 方法 A
public void write(Buffer source, long byteCount) throws IOException {
...
}
@Override public void close() throws IOException {
out.close();
}
...
};
}
Sink 是个接口,Okio.sink(file) 中创建了 FileOutputStream(file) ,然后传递到 sink(final OutputStream out, final Timeout timeout) 中,返回了实现具体方法的 Sink 对象,这里面重点注意 write() 方法,我们标记为 方法 A。
Okio.buffer(sink) 中的代码
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
它返回了 RealBufferedSink 对象,BufferedSink 和 RealBufferedSink 都实现了 Sink 接口; writeUtf8("abcdefghijklmn \n") 方法及下面的两个同样的方法,对应的代码是 RealBufferedSink 内部的 writeUtf8() 方法,
@Override
public BufferedSink writeUtf8(String string) throws IOException {
if (closed) throw new IllegalStateException("closed");
// 方法 B
buffer.writeUtf8(string);
return emitCompleteSegments();
}
注意方法 B,它调用的是 Buffer 中的方法,
@Override
public Buffer writeUtf8(String string) {
return writeUtf8(string, 0, string.length());
}
@Override
public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
...
// Transcode a UTF-16 Java String to UTF-8 bytes.
for (int i = beginIndex; i < endIndex;) {
int c = string.charAt(i);
if (c < 0x80) {
Segment tail = writableSegment(1);
byte[] data = tail.data;
int segmentOffset = tail.limit - i;
int runLimit = Math.min(endIndex, Segment.SIZE - segmentOffset);
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
while (i < runLimit) {
c = string.charAt(i);
if (c >= 0x80) break;
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
}
int runSize = i + segmentOffset - tail.limit; // Equivalent to i - (previous i).
tail.limit += runSize;
size += runSize;
}
...
}
return this;
}
这里简化了代码,我们知道,一个汉字是两个或者三个字符,一个字母占一个字符,这里统一当做字母来分析,以 "abcdefghijklmn \n" 为例,beginIndex 为0,endIndex 为16,for循环中,c 是字符串中该位置对应的字符,用 ASCII 码的十进制展示数据, 0x80 是ASCII 中的边界,我们只看它里面的,生成一个 Segment 对象,Segment 是什么,稍后再分析,现在只需要知道它包含字节数组对象即可,tail.limit 和 i 初始值都是零,此时 segmentOffset 为0,runLimit 值为16,下一步则把 c 转换为字节,添加到 data 数组中,注意数组的索引值: segmentOffset + i++ ,它是0,但赋值后,i则变为了1,再往下,是个 while 循环,与上面一样,segmentOffset 一直为0,i 不停的增加,就这样,"abcdefghijklmn \n" 被转换为字节,填充到了 data 数组中。此时,runSize 值为 16,然后把它赋值给 Segment 对象中的 limit 属性,同时 size 值也由0的基础上加了16,变为16;后面的writeUtf8()和之前的一样,原理类似,最终把字符串转换为字节,添加到 Segment 的 data 数组中,用 limit 标记写入的位置, Buffer 中 size 记录的是本次写入的字节个数。
接着看 emitCompleteSegments() 方法
@Override
public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
Buffer
public long completeSegmentByteCount() {
long result = size;
if (result == 0) return 0;
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
return result;
}
此时由于写入的字符串比较短,Segment 只有一个,它本身是个链表,后面再分析,tail.limit - tail.pos 这两个参数,pos 是0,result 和 limit 是相同的,所以运算后,result 值为0,所以 byteCount 的值为0,进入不到后面的if判断中。此时再看 Okio 的最后一个方法 close()
@Override
public void close() throws IOException {
if (closed) return;
Throwable thrown = null;
try {
if (buffer.size > 0) {
sink.write(buffer, buffer.size);
}
} catch (Throwable e) {
thrown = e;
}
try {
sink.close();
} catch (Throwable e) {
if (thrown == null) thrown = e;
}
closed = true;
if (thrown != null) Util.sneakyRethrow(thrown);
}
这个方法中其实就两点,一个是判断 if (buffer.size > 0) 去执行 sink.write(buffer, buffer.size) 方法,一个就是完毕后,执行 sink.close() 方法,这里的 sink 其实就是方法 A 中的 new Sink(),看看它自定义的方法
@Override
public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
第一行是校验,看while循环中,获取到头部的 Segment,然后计算出要拷贝的值,这里 byteCount 和 head.limit - head.pos 的值是相等的,然后就是 out.write(head.data, head.pos, toCopy) 方法,这一步调用的是 IO 流操作,我们应该很眼熟,然后计算 pos 、size 等值,此时 head.pos 和 head.limit 相等,Segment 被回收,放到 SegmentPool 池子中供复用,写入的逻辑到此基本结束,剩下的 sink.close() 方法是把 out 这个流给关闭。
readLines(File file) 方法中是读取文本的信息,对应的是 RealBufferedSource 类,它与写的原理一样,一次性把大量的内容读取出来,放到 Segment 的数组中,然后根据需要,吐出相应的内容。我们知道 FileOutputStream 每次写入一个字节,效率很低,为了提高效率,发明了 BufferedOutputStream 缓冲流,缓冲一批数据一次写入;Segment 的 data 数组就是缓冲数据,然后通过 IO 流一次性写入文本,这也是 Okio 效率高的原因,如果仅仅是这样,它也就是类似缓冲流,那么它还有什么亮点呢?我们看看它
final class Segment {
static final int SIZE = 8192;
static final int SHARE_MINIMUM = 1024;
final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment next;
Segment prev;
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
Segment(Segment shareFrom) {
this(shareFrom.data, shareFrom.pos, shareFrom.limit);
shareFrom.shared = true;
}
Segment(byte[] data, int pos, int limit) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.owner = false;
this.shared = true;
}
}
byte[] data 就是用来存储数据的数组,pos 是写入数据的开始位置,limit 是读取数据的开始位置,shared 和 owner 这是两个辅助属性,在构造方法中可以看到它们的赋值,中点看看 next 和 prev 属性,从这里可以看出, Segment 支持链表操作,并且是双链表,我们可以看看它的添加和移除方法
public Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
public Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
pop() 方法是删除循环链接列表的此段并返回其后继,这个和 LinkedList 的删除自身不太一样,LinkedList 中删除返回的是对应的元素,而 Segment 中,假设 Segment A 的next 属性时 Segment B,则 A.pop() 方法返回的值是 B,同时A从它的链表中移除了。push() 方法是往链表中添加一个对象,还是上面的例子,push(C) 后,A的next 是 C,C 的next 是 B。
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
先看 writeTo() 方法,意思是把当前 Segment 的 data 中的部分或全部内容,写入 sink 对象中;此处会判断要写入对象 sink 中输入的位置,如果加上要写入内容的大小 byteCount,超过了最大 值,则想办法对它里面的内容整体移动,如果它是被分享的,不能移动;sink.limit - sink.pos 的值是该 data 数组中内容的小小,如果剩余的大小比 byteCount 小,则说明位置不够;System.arraycopy() 是数组拷贝方法,此时把数组中的内容整体移动到从零开始,同时 limit 和 pos 的值都相对应的减小。举个简单的例子,比如说data 此时的值为{0,0,1,2,3,4,0,0},此时 limit 是5,pos 是2,那么经过 System.arraycopy() 后,data 变为 {1,2,3,4,0,0,0,0},limit是3,pos是0,这个if中判断的内容是先对 sink 对象进行内容位移,优化出连贯的空间,供下一步使用。剩下的还是复制,从当前 data 复制到 sink 的 data 中,同时改变 sink 的 limit 和当前的 pos 值。
public void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
compact() 方法是指优化自己的内存,习惯性判断,如果 prev == this 说明当前只剩下自己了,没办法再优化;这里为什么会说只剩下自己了,后面再分析。如果 shared 为 true,data 里面的内容不能向前移动,所以计算出当前 data 中可用的剩余值availableByteCount,byteCount 是要移动的内容的长度,如果可用长度大于或等于移动长度,才能移动,这里通过 writeTo(prev, byteCount) 方法把自身 data 中可用的内容复制到 prev 中,也就是它的前一个 Segment 中,然后调用 pop() 方法切断前后的链表,最后通过 SegmentPool 回收。
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
if (byteCount >= SHARE_MINIMUM) {
prefix = new Segment(this);
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
上面两个方法分析完了,再看 split() 方法就简单了,首先是数据校验,然后是判断 byteCount 的值,如果大于了 SHARE_MINIMUM 的值,则直接new一个Segment,注意它的构造方法,此时当前 Segment 和 prefix 公用一个 data,shared 值为 true,这也是为什么 data 不能移动的原因,因为会导致错乱;byteCount 较小时,会从 SegmentPool 池子中获取一个对象
,然后对data进行复制;接着就是修改 limit 和 pos 的值,然后就是通过 prev.push(prefix) 把 prefix 添加到当前链表中。
final class SegmentPool {
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
static Segment next;
static long byteCount;
private SegmentPool() {
}
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment();
}
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return;
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return;
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
}
SegmentPool 代码比较简单,take() 方法和 Message 的 obtain() 方法比较类似,next 会形成一条单链表,如果 next 为null,则创建一个 Segment,否则从链表中头部开始取值,这里 next 这个对象,一直是头部 head,每取出一个,byteCount 总量就减少Segment.SIZE;recycle() 方法是指回收 Segment 对象,回收的 Segment 对象只能是单个,不能还在链表中,同时 shared 为 true 的不能会回收;注意 synchronized 代码块里面的内容,这里会对 byteCount 大小做个限制,一旦超过了 MAX_SIZE,即八个 Segment 对象,池子中就不接受新的了。回收时,会用 byteCount 记录大小,pos 和 limit 都清零,会把当前 next 对象作为回收 segment 对象的next属性,然后把当前 segment 赋值给 next,这样就形成了一条链表。
这些东西看完了,再回过头来看看 Buffer 中的 writableSegment() 方法,看看它里面是怎么创建 Segment 对象的
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
首次调用这个方法,head 为null,此时创建了一个 Segment 赋值给 head,注意 return 中的代码, head.next = head.prev = head 意思是next 和 prev 都指向自己;往下看,获取 head 的前一个 Segment,如果head只有自己的话,那么前一个也是自己,注意if条件,如果需要添加一个新的 Segment,调用 push(SegmentPool.take()),此时,head 的链表中有两个 Segment 对象了,但他们是个死循环,收尾呼应相连接,如果我们用 while 循环,根据 next 不为null判断,它会无限循环下去,这样也就解释的通 Segment 的 compact() 方法中为什么判断 prev == this 就跑异常,因为链表中只有它自己了,没办法也没必要压缩了。