线程之间通信的三种方式:
(1)共享内存。
(2)wait()/notify()方法。
(3)管道输入输出流。
本篇将介绍第三种方式,使用管道输入输出流进行线程间的通信。
管道输入/输出流的思想:
Java里的管道输入流PipedWriter与管道输出流PipedReader实现了类似管道的功能,用于不同线程之间的相互通信。
java的管道输入与输出实际上使用的是一个循环缓冲数组来实现,这个数组默认大小为1024字节。读线程使用PipedReader从这个循环缓冲数组中读数据,写线程使用PipedWriter往这个循环缓冲数组中写入数据。
(1)当这个缓冲数组已满的时候,写线程将阻塞;
(2)当这个缓冲数组为空的时候,读线程将阻塞;
这其实就是生产者消费者模式的实现。
基本流程如下图所示:
从上图中可以看出,最关键的地方就是对PipedReader中的缓冲区的控制。
下面是PipedReader的成员变量:
//缓冲区默认的大小
private static final int DEFAULT_PIPE_SIZE = 1024;
//缓冲区
char buffer[];
//写线程将要写入的字符数据在缓冲区的索引位置,in<0表示缓冲区为空,in==out表示缓冲区满
int in = -1;
//读线程将要读取的字符数据在缓冲区的索引位置。
int out = 0;
成员变量in用来控制写线程写入缓冲区的位置,顺序自增。而out控制读线程读取缓冲区数据的索引位置,顺序自增。
缓冲区的状态:
(1)缓冲区为空
缓冲区为空包括从未有写线程向缓冲区输入数据,还有一种情况即读线程将写线程写的数据都读取完毕,这两种情况都是缓冲区为空的状态。如下图所示:
当缓冲区为空的时候,in=1,out=0,此时需要阻塞读线程,通知写线程继续向缓冲区写入数据。
(2)缓冲区满
由于使用的缓冲区是用循环数组实现的,即写线程写到数组的末尾时可以从索引0开始继续写数据,直到遇到读线程的索引位置out,此时缓冲区满了。如下图所示:
当缓冲区满了,就需要阻塞写线程,唤醒读线程继续读取数据。
源码分析
PipedReader的read方法。
public synchronized int read() throws IOException {
//1.是否建立管道连接
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
//2.输出管道是否关闭,调用close()方法关闭输出管道
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
//3.判断写线程是否还存活
throw new IOException("Write end dead");
}
//4.获得当前读线程的引用
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
//5.如果缓冲区为空
if (closedByWriter) {
/* 写管道关闭 ,返回-1*/
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
//6.唤醒写线程继续向缓冲区写数据
notifyAll();
try {
//7.释放锁,将读线程加入到等待队列
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
//8.执行读取操作
int ret = buffer[out++];
//9.如果读取索引位置大于等于缓冲区最大容量,从头读取
if (out >= buffer.length) {
out = 0;
}
//10.如果读取完所有的缓冲数据,缓冲区为空,就重置读取索引位置。
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}
PipedWriter的write()方法
public void write(int c) throws IOException {
//1.PipedReader sink
if (sink == null) {
throw new IOException("Pipe not connected");
}
//2.使用管道输入流PipedReader的receive方法向缓冲区输入数据。
sink.receive(c);
}
PipedReader的receive方法
synchronized void receive(int c) throws IOException {
//1.管道是否连接
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
//2.管道是否关闭
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
//3.读线程是否存活
throw new IOException("Read end dead");
}
//4.获取当前写线程的引用
writeSide = Thread.currentThread();
while (in == out) {
//5.缓冲区满
if ((readSide != null) && !readSide.isAlive()) {
throw new IOException("Pipe broken");
}
/* full: kick any waiting readers */
//6.唤醒读线程读取
notifyAll();
try {
//7.将写线程阻塞
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
//8.如果缓冲区为空,重置in和out
if (in < 0) {
in = 0;
out = 0;
}
//9.写入数据
buffer[in++] = (char) c;
//10。如果写入索引位置大于等于缓冲区最大容量,从头写入
if (in >= buffer.length) {
in = 0;
}
}
管道输出/输入流实现线程通信
下面给出使用PipedReader和PipedWriter实现线程之间通信的代码:
public class Test {
public static void main(String[] args) throws IOException {
PipedWriter out=new PipedWriter();
PipedReader in=new PipedReader();
out.connect(in);
new Thread(new Reader(in)).start();
new Thread(new Writer(out)).start();
}
}
class Reader implements Runnable{
PipedReader pd;
public Reader(PipedReader pd){
this.pd=pd;
}
@Override
public void run() {
// TODO Auto-generated method stub
int rs=0;
try {
while((rs=pd.read())!=-1){
System.out.print((char)rs);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class Writer implements Runnable{
PipedWriter pw;
public Writer(PipedWriter pw){
this.pw=pw;
}
@Override
public void run() {
// TODO Auto-generated method stub
int rs=0;
try {
while((rs=System.in.read())!=-1){
pw.write(rs);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}