- 之前,因为学习HDFS,专门梳理了基于的文件的输入/输出流:基于文件的java输入/输出流
- 学习并发编程关于如何实现线程间通信,就有介绍使用管道流实现线程间通信
- 管道字节流:PipedInputStream、PipedOutputStream,管道字符流:PipedWriter、PipedReader
- 本文将基于管道字节流,来学习如何使用管道流
1. 管道的理解
1.1 管道
- 管道,直接看成一节水管就行。
- 水管想要使用起来,必须有一个入口往其中灌水,然后水管中的水从出扣流出供人使用
- 管道的入口和出口是一一对应的,要想其中的数据发生流动
- 一般,一个线程往管道写入数据、另一个线程从管道读取数据,这样管道就能实现线程间的通信了
1.2 管道入口和出口应该使用哪种流?
- 我是个微胖人士,平时管不住嘴、迈不开腿
- 如果让我举个例子帮助大家理解管道流,那我可能会举老家原始的灌香肠的方法
- 家用绞肉机,有个类似喇叭的装肉入口,有个类似竹筒的出口
- 在竹筒上套上猪小肠,被绞碎的肉一出来就会逐渐将小肠灌满
- 从绞肉机本身就是一个管道,从入口塞肉、出口出肉
- 对于在入口处往管道塞肉的父亲来说,肉从他手里出去的,他是一个输出流(将数据往外推送)
- 对于在出口处装肠的母亲来说,肉到了小肠中,小肠一个输入流(接收别人推送来的数据)
总结
- 管道流中,入口处是输出流,需要输出流往管道中写数据
- 管道流中,出口处是输入流,需要输入流从管道中消费(读)数据
- 其实吧,我估计不只是管道流,其他的输入输出流,可能刚学时大家都会有点混淆
- 啥时候用输入流,啥时候用输出流?
- 我自己的诀窍就是:往某个设备写数据时,用输出流;从某个设备读数据时,用输入流
- 输入,是从数据的接收者来说的,对他来说数据是
入
;输出,针对数据的发送者来说的,对他来说数据是出
2. 管道字节流
- 本小节中,为了简单,管道字节输出流 → \rightarrow → 管道输出流,管道字节输入流 → \rightarrow → 管道输入流
2.1 关于connect方法
PipedOutputStream中的connect方法
-
刚才的讲解中提到,管道必须同时必须出口和入口,否则管道中的数据无法流动起来
-
此时,需要将管道输出流与管道输入流进行关联,已构成类似生产者/消费者的模式
-
PipedOutputStream
提供了connect方法,用于将管道输出流(源头)与管道输入流(尽头)关联PipedOutputStream
中,sink用于记录已经与该管道输出流关联的管道输入流PipedInputStream
中,connected用于标识该管道输入流是否已经与某个管道输出流关联- 若该管道输出流已经和其他管道输入流关联(sink != null),或传入的管道输入流已经和其他管道输出流关联(snk.connected),则二者关联失败,抛出 IOException 异常
- 否则,同时更新管道输入和管道输出流中相关的记录字段(sink、connected)
public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
-
除此之外,connect() 方法还被synchronized修饰,保证多线程使用下,只能有一个管道输入流和管道输出流绑定
PipedInputStream中的connect方法
-
PipedInputStream也提供了connect方法,实际调用的是管道输出流的connect方法实现关联
public void connect(PipedOutputStream src) throws IOException { src.connect(this); }
-
也就是说,不仅可以通过管道输出流的connect方法实现绑定,还可以通过管道输入流的connect方法实现绑定。
-
二者,实质都是通过管道输出流的connect方法实现绑定的
自己的理解:
- 不同的人对输入和输出的绑定关系存在不同的看法
- 我可能倾向于说先有输出才有输入(
out.connect(in)
) - 但有的人可能倾向于将输入绑定到输出上(
in.connect(out)
)
2.2 如何通过管道流实现线程通信?
-
根据以上的学习,通过管道流实现线程通信的步骤如下:
- 创建管道输出流、管道输入流,并通过connect实现绑定
- 在发送者线程中,通过输出流向管道写数据;在接收者线程中,通过输入流从管道读数据
- 启动发送者和接收者线程,实现两个线程间的通信
-
创建管道输出流、管道输入流,并将二者关联;使用
in.connect(out);
实现关联也是一样的// 创建管道输入输出流 PipedOutputStream out = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(); // 管道流相关联 out.connect(in);
-
定义发送者接收者线程:发送者,打印并发送3个UUID;接收者,接收并打印UUID
class SendMessage implements Runnable { private PipedOutputStream outputStream; public SendMessage(PipedOutputStream outputStream) { this.outputStream = outputStream; } @Override public void run() { try { for (int i = 0; i < 3; i++) { String str = UUID.randomUUID().toString(); System.out.println("sender: " + str); outputStream.write(str.getBytes(StandardCharsets.UTF_8)); } } catch (IOException exception) { exception.printStackTrace(); } finally { try { outputStream.close(); } catch (IOException exception) { exception.printStackTrace(); } } } } class ReceiveMessage implements Runnable { private PipedInputStream inputStream; public ReceiveMessage(PipedInputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { int len; byte[] data = new byte[36]; try { while ((len = inputStream.read(data)) != -1) { System.out.println("receiver: " + new String(data, 0, len)); } } catch (IOException exception) { exception.printStackTrace(); } finally { try { inputStream.close(); } catch (IOException exception) { exception.printStackTrace(); } } } }
-
通过线程池,启动线程、实现线程间的通信
// 创建线程实现管道流读取 ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new SendMessage(out)); executorService.submit(new ReceiveMessage(in)); // 执行完后,关闭线程池 executorService.shutdown();
不能反复关联
- connect方法中,对于管道输出流或管道输入是否已经关联是一刀切的
- 即使是已关联的管道输出流和管道输入流,再次关联也不行
- 也就是说,对同一对管道输出流和管道输入流,不能多次使用connect方法做关联
- 下面的代码将抛出异常:
public static void repeatConnectTest() throws IOException { // 创建管道输入输出流 PipedOutputStream out = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(); // 管道流相关联 out.connect(in); in.connect(out); // 创建线程实现管道流读取 ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new SendMessage(out)); executorService.submit(new ReceiveMessage(in)); // 执行完后,关闭线程池 executorService.shutdown(); }
2.3 通过构造函数实现关联
-
为了简单起见,除了可以通过connect方法实现管道输出流和管道输入流的关联外,Java还支持直接通过构造函数实现关联
-
实质: 通过PipedOutputStream的connect实现关联
-
PipedOutputStream的构造函数如下,支持传入一个管道输入流,然后调用自身的connect方法实现关联
public PipedOutputStream(PipedInputStream snk) throws IOException { connect(snk); }
-
PipedInputStream也支持通过构造函数直接与管道输出流绑定,其中
pipeSize
是管道缓冲区的大小,默认为1024public PipedInputStream(PipedOutputStream src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); }
-
因此,还可以如下代码也能基于管道实现线程间的通信
public static void constructorTest() throws IOException { // 创建管道输入输出流 /* PipedOutputStream out = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(out);*/ PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(in); // 创建线程实现管道流读取 ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new SendMessage(out)); executorService.submit(new ReceiveMessage(in)); // 执行完后,关闭线程池 executorService.shutdown(); }
3. 管道字符流
-
管道字符流与管道字节流也是具有同样的机制:
- PipedWriter中提供
synchronized
的connect方法,实现PipedReader的关联 - PipedReader中也提供connect方法,实际调用PipedWriter的connect方法关联
- PipedWriter和PipedReader都支持通过构造函数直接进行关联
- PipedWriter中提供
-
基于管道字符流的代码示例如下:
public static void main(String[] args) throws IOException { // 创建管道字符流 PipedWriter writer = new PipedWriter(); PipedReader reader = new PipedReader(); writer.connect(reader); // 创建线程实现管道流读取 ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new SendMessage1(writer)); executorService.submit(new ReceiveMessage1(reader)); // 执行完后,关闭线程池 executorService.shutdown(); } class SendMessage1 implements Runnable { private PipedWriter writer; public SendMessage1(PipedWriter writer) { this.writer = writer; } @Override public void run() { try { for (int i = 0; i < 3; i++) { String str = UUID.randomUUID().toString(); System.out.println("sender: " + str); writer.write(str.toCharArray()); } } catch (IOException exception) { exception.printStackTrace(); } finally { try { writer.close(); } catch (IOException exception) { exception.printStackTrace(); } } } } class ReceiveMessage1 implements Runnable { private PipedReader reader; public ReceiveMessage1(PipedReader reader) { this.reader = reader; } @Override public void run() { try { char[] chs = new char[256]; int len; while ((len = reader.read(chs)) != -1) { System.out.println("receiver:" + new String(chs, 0, len)); } } catch (IOException exception) { exception.printStackTrace(); } finally { try { reader.close(); } catch (IOException exception) { exception.printStackTrace(); } } } }
4. 总结
4.1 一定要多线程间通信吗?
-
有资料说管道流适合实现线程间通信,并不是限定说管道输入流和管道输出流必须在不同的线程中调用
-
可以在同一个线程中,向管道写入数据,再从管道读出数据
-
就像在同一个线程中,先写文件、再读文件一样
public static void main(String[] args) throws IOException { // 创建管道字符流 PipedWriter writer = new PipedWriter(); PipedReader reader = new PipedReader(); writer.connect(reader); // 输出 try { for (int i = 0; i < 3; i++) { String str = UUID.randomUUID().toString(); System.out.println("sender: " + str); writer.write(str.toCharArray()); } } catch (IOException exception) { exception.printStackTrace(); } finally { try { writer.close(); } catch (IOException exception) { exception.printStackTrace(); } } // 输入 try { char[] chs = new char[256]; int len; while ((len = reader.read(chs)) != -1) { System.out.println("receiver:" + new String(chs, 0, len)); } } catch (IOException exception) { exception.printStackTrace(); } finally { try { reader.close(); } catch (IOException exception) { exception.printStackTrace(); } } }
-
但其实这样做的意义并不大,除非同一个线程需要有这样的先后处理逻辑
-
更多的,还是不同线程间文件的读写更加具有应用意思
4.2 小结
-
对管道流的学习,非常简单,并未深入
- 管道流的入口和出口分别应该是什么类型的流
- 如何关联入口的管道输出流和出口的管道输入流? 输出流的connect方法是基础
- 简单的编程示例,通过管道字节流和管道字符流如何实现线程间通信
-
参考文档: