该类无引入包
继承自Reader类
该类的类头注释如下:(我见过最简洁的
Piped character-input streams.
大意如下:
管道字符输入流
该类含有如下的成员变量:
对应得输出流标志位(是否关闭
boolean closedByWriter = false;
对应读取流关闭标志位(就是自己,感觉这里也该加上volaile
boolean closedByReader = false;
连接状态
boolean connected = false;
读取线程
Thread readSide;
写入线程
Thread writeSide;
默认缓冲区大小
private static final int DEFAULT_PIPE_SIZE = 1024;
缓冲区
char buffer[];
写入偏移量
int in = -1;
读取偏移量
int out = 0;
该类含有如下的成员方法:
构造函数(与给定的写出管道链接,使用默认大小缓冲区
public PipedReader(PipedWriter src) throws IOException { this(src, DEFAULT_PIPE_SIZE); }
构造函数(与给定的写出管道连接,使用指定大小缓冲区
public PipedReader(PipedWriter src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); }
默认构造函数(暂无链接,默认大小
public PipedReader() { initPipe(DEFAULT_PIPE_SIZE); }
构造函数(暂无链接,给定大小
public PipedReader(int pipeSize) { initPipe(pipeSize); }
管道初始化(初始化缓冲区
private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe size <= 0"); } buffer = new char[pipeSize]; }
连接管道
public void connect(PipedWriter src) throws IOException { src.connect(this); }
接收单个字符数据进管道
synchronized void receive(int c) throws IOException { if (!connected) {//检查连接状态 throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) {//检测是否有一段关闭 throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) {//检测读取线程活动性 throw new IOException("Read end dead"); } writeSide = Thread.currentThread(); while (in == out) {//当前缓冲区已满,暂时无法写入 if ((readSide != null) && !readSide.isAlive()) { throw new IOException("Pipe broken"); } /* full: kick any waiting readers */ notifyAll();//唤醒所有挂起线程(读取线程 try { wait(1000);//挂写入线程 } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } if (in < 0) {//缓冲为空 in = 0; out = 0; } buffer[in++] = (char) c;//存写入数据 if (in >= buffer.length) {//循环数组修改偏移量 in = 0; } }
接收字符数组(对上一个函数的循环调用
synchronized void receive(char c[], int off, int len) throws IOException { while (--len >= 0) { receive(c[off++]); } }
当写入端数据全部传输完后,唤醒读取线程,关闭管道
synchronized void receivedLast() { closedByWriter = true; notifyAll();//唤醒所有读取线程 }
读取字符数据
public synchronized int read() throws IOException { if (!connected) {//管道连接状态 throw new IOException("Pipe not connected"); } else if (closedByReader) {//Reader端是否关闭 throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive()//检测现在是否有程序在写入 && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread();//写入线程绑定 int trials = 2;//两次机会 while (in < 0) {//当前为空 if (closedByWriter) {//且写入端已关闭 /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll();//唤醒所有线程(写线程 try { wait(1000);//睡读取线程 } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++];//读取数据 if (out >= buffer.length) { out = 0; } if (in == out) {//读取完毕,缓冲区为空 /* now empty */ in = -1; } return ret;//返回字符的UTF-8 }
读取出字符数组
public synchronized int read(char cbuf[], int off, int len) throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if ((off < 0) || (off > cbuf.length) || (len < 0) ||//下表有效性检查 ((off + len) > cbuf.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read();//调用上个函数获得数据 if (c < 0) {//判定缓冲区是否为空 return -1; } cbuf[off] = (char)c;//写入数组 int rlen = 1;//实际读取出的长度 while ((in >= 0) && (--len > 0)) {//循环读取 cbuf[off + rlen] = buffer[out++]; rlen++; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; }
检查当前管道是否可以进行数据读取
public synchronized boolean ready() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if (in < 0) { return false; } else {//管道有效且缓冲区不为空 return true; } }
关闭当前管道
public void close() throws IOException { in = -1;//清空缓冲区 closedByReader = true;//reader端关闭 }
该类与PipedInputStream类基本一样,只是在操作的时候加入char类型的强制转码,该类一些需要注意的地方与PipedInputStream相同,如有需要请查阅。