概念
- 同步: 线程在发起调用时, 需要等待任务处理结果, 才能继续干别的事情
用户去收快递, 快递小哥哥不在, 等在这里, 直到快递小哥哥回来, 给一个结果
- 异步: 线程在处理任务时, 不需要等待任务结果, 当任务完成后会主动通知线程
用户去收快递, 快递小哥哥不在, 贴个纸条, 快递小哥哥回来看到, 主动给用户打电话, 告知快递结果
- 阻塞: 在读取处理套接字时, 线程将会被挂起, 直到数据读取完成
去收快递, 快递还没到, 就等在那里, 什么事情也干不了, 直到收到快递为止
- 非阻塞: 在读取套接字时, 无论有无数据, 函数会立即返回
去收快递, 快递还没到, 先去干别的事情, 等会再来看看
同步阻塞IO
Java中提供的标准IO输入输出, 同步IO, 一个数据流即需要一个线程去读取, 此时, 线程将会等待数据流准备时间, 如果未准备好, 则需要一直阻塞, 可以采取线程池的方式对该读取方式进行优化
@Test
fun testBlockingIO() {
val serverSocket = ServerSocket(9000)
println("server: 服务端启动,等待连接...")
// 启动客户端线程
Thread {
println("client: 开始连接服务端...")
// 指定服务端地址及端口
val socket = Socket("localhost", 9000)
val input = DataInputStream(socket.getInputStream())
val output = DataOutputStream(socket.getOutputStream())
println("client: 处理3秒钟")
Thread.sleep(3000)
println("client: 发送消息")
output.writeUTF("hello server!")
output.flush()
println("client: 接收数据[${input.readUTF()}]")\
}.start()
// 服务器阻塞5秒钟, 等待客户端连接
val socket = serverSocket.accept()
println("server: 有客户端连接了")
val input = DataInputStream(socket.getInputStream())
val output = DataOutputStream(socket.getOutputStream())
println("server: 接收数据[${input.readUTF()}]")
println("server: 服务处理5秒钟...")
Thread.sleep(5000L)
println("server: 发送消息")
output.writeUTF("hello client!")
output.flush()
}
运行结果
server: 服务端启动,等待连接...
client: 开始连接服务端...
server: 有客户端连接了
client: 处理3秒钟
client: 发送消息
server: 接收数据[hello server!]
server: 服务处理5秒钟...
server: 发送消息
client: 接收数据[hello client!]
上述程序, 在执行 readLine() 读取的时候, 当还未传输完数据时, 线程均处于等待状态, 直到有数据过来
有同学可能在写IO操作的时候, 会遇到读取不到数据的状况, 一直阻塞, 解决办法可以有如下几种:
- 中断输入流, 即
socket.shutdownOutput()
, 此时, 从该socket里读取任何内容都是-1, 即认为达到流的结尾, 程序返回. 此种方法固然可以解决这个问题, 但是却造成socket只通讯了一次就关闭了, 无法实现持续的通讯 - 利用结尾字符标记的方式, 如示例代码中, 读取采用readLine(), readLine() 当读取到 \n 字符会直接返回, 所以在发送的时候在结尾增加了\n字符, 这时即可实现循环消息读取
- 原理同上, 使用
DataInputStream
和DataOutputStream
, 在写入的时候使用writeUTF()
, 读取的时候使用readUTF()
同步非阻塞IO
示例代码如下, 与普通IO最大的区别在于, 当去读取IO数据时, 无论数据有没有准备好, 会立即返回, 线程需要不断的轮询去查看socket是否准备好, 相对于同步阻塞IO, 线程不用等在那里等待IO就绪, 但同时浪费了大量cpu的轮询时间
@Test
fun testNonBlockingIO(){
// 服务端
val channel = DatagramChannel.open().apply {
configureBlocking(false) // 非阻塞
bind(InetSocketAddress(9000)) // 绑定端口
}
Thread {
Thread.sleep(5000)
val datagramChannel = DatagramChannel.open().apply {
connect(InetSocketAddress("localhost", 9000))
}
println("client: 发送数据了")
datagramChannel.write(ByteBuffer.wrap("hello nio server".toByteArray()))
}.start()
while(true){
val byteBuffer = ByteBuffer.allocate(1024)
if(channel.receive(byteBuffer) !== null ) {
val length = byteBuffer.position()
byteBuffer.flip()
val data = ByteArray(length).apply {
System.arraycopy(byteBuffer.array(), 0, this, 0, length)
}
println("server: 接收到数据 - [${String(data)}]")
break;
} else {
println("server: 没有读取到数据, 休眠3秒再试试")
Thread.sleep(3000)
}
}
}
运行结果
server: 没有读取到数据, 休眠3秒再试试
server: 没有读取到数据, 休眠3秒再试试
client: 发送数据了
server: 接收到数据 - [hello nio server]
server: 没有读取到数据, 休眠3秒再试试
IO多路复用
I/O多路复用是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作
Java中NIO中的几个部件: Channel(通道), Buffer(缓冲区), Selector(监视器)
client与服务端通过Channel.Socket 进行连接, 数据传送至 Buffer区域, 当该区域可读时, 监视器捕获该事件, 读取并处理数据(当selector注册为 OP_READ)
与同步非阻塞IO的最大区别在于, 线程不用不断的轮询去查看io是否就绪, 而是改为等待在那里, 当io就绪后, 主动通知线程去处理, 节省了cpu轮询时间; 代码上主要引入了Selector(监视器)
废话不多说, 示例代码为 udp 通讯的 nio 实现:
@Test
fun testSelectorSocket(){
// 服务端
val channel = DatagramChannel.open().apply {
configureBlocking(false) // 非阻塞
socket().reuseAddress = true // 地址重用
bind(InetSocketAddress(9000)) // 绑定端口
}
val selector = Selector.open()
// 注册为 OP_READ 事件
channel.register(selector, SelectionKey.OP_READ)
// 启动发包线程
Thread {
for(i in 0..10) {
val datagramChannel = DatagramChannel.open().apply {
connect(InetSocketAddress("localhost", 9000))
}
datagramChannel.write(ByteBuffer.wrap("hello nio server $i".toByteArray()))
val byteBuffer = ByteBuffer.allocate(1024)
if (datagramChannel.receive(byteBuffer) !== null) {
val length = byteBuffer.position()
byteBuffer.flip()
val data = ByteArray(length).apply {
System.arraycopy(byteBuffer.array(), 0, this, 0, length)
}
println("client: 接收到数据 - [${String(data)}]")
}
Thread.sleep(2000L * i * 2)
}
}.start()
// 服务端监听
while(true) {
// 当有可读取的通道之前,该方法会一直阻塞, 阻塞超过5秒钟, 执行其他操作, 不设置则一直阻塞
if(selector.select(5000) > 0) {
val iterator = selector.selectedKeys().iterator()
iterator.forEach {
if(it.isValid && it.isReadable){
val datagramChannel = it.channel() as DatagramChannel
val byteBuffer = ByteBuffer.allocate(1024)
var socketAddress: SocketAddress? = null
if(datagramChannel.receive(byteBuffer).let { socketAddress = it; it } !== null) {
// 数据读取到bytebuffer后, position指向结尾处
val length = byteBuffer.position()
// position将会指向数据的起始位置
byteBuffer.flip()
val data = ByteArray(length).apply {
System.arraycopy(byteBuffer.array(), 0, this, 0, length)
}
println("server: 接收到数据 - [${String(data)}]")
datagramChannel.send(ByteBuffer.wrap("hello nio client".toByteArray()), socketAddress)
}
}
iterator.remove()
}
}
println("处理完一轮了, 再去看看有没有消息")
}
}
运行结果
server: 接收到数据 - [hello nio server 0]
处理完一轮了, 再去看看有没有消息
client: 接收到数据 - [hello nio client]
server: 接收到数据 - [hello nio server 1]
处理完一轮了, 再去看看有没有消息
client: 接收到数据 - [hello nio client]
server: 接收到数据 - [hello nio server 2]
处理完一轮了, 再去看看有没有消息
client: 接收到数据 - [hello nio client]
处理完一轮了, 再去看看有没有消息
...
上述例子中, 服务端会对接入的socket进行监听, 当socket就绪并且可读时, 则select()
方法会返回, 在本例中, 设置了select(5000)
超时时间
AIO(Asynchronous IO) 异步非阻塞IO
主要融合了Java的Future模式, 提供了回调的方式, 当调用阻塞操作时, 启用子线程去处理, 处理完成后, 回调后续的业务操作, 注意, 此时的线程不再是之前主线程, 如果想保持在主线程中执行后续的业务逻辑, 则可以使用 future.get() 方式, 使得主线程阻塞直到任务完成
示例代码如下:
@Test
fun testAIO() {
val channel = AsynchronousServerSocketChannel.open().apply {
bind(InetSocketAddress(9000)) // 绑定端口
}
// 异步处理 accept 事件
class AcceptHandler(serverSocketChannel: AsynchronousServerSocketChannel)
: CompletionHandler<AsynchronousSocketChannel, Nothing?> {
override fun completed(socketChannel: AsynchronousSocketChannel, attachment: Nothing?) {
println("server: 有client接入了 - $socketChannel")
// 或使用 write(ByteArray byteArray, A attachment, CompletionHandler<Long,? super A> handler); 异步触发消息发送完毕事件
socketChannel.write(ByteBuffer.wrap("hello client!".toByteArray())).get()
}
override fun failed(exc: Throwable?, attachment: Nothing?) {
}
}
channel.accept(null, AcceptHandler(channel))
// AIO Client
val byteBuffer = ByteBuffer.allocate(1024)
val socketChannel = AsynchronousSocketChannel.open().apply {
connect(InetSocketAddress("localhost", 9000)).get()
}
// 可指定超时时间, 使用 CompletionHandler 异步获取执行结果
socketChannel.read(byteBuffer).get()
val length = byteBuffer.position()
byteBuffer.flip()
val data = ByteArray(length)
System.arraycopy(byteBuffer.array(), 0, data, 0, length)
println("client: 接收到数据 - ${String(data)}")
}
也可指定线程池处理运行回调业务:
...
val channelGroup = AsynchronousChannelGroup
.withThreadPool(Executors.newFixedThreadPool(20))
AsynchronousSocketChannel.open(channelGroup)
...
最后
io是计算机世界中一个重要概念, 关系着系统的性能以及吞吐量, 本文对java中的几种IO模式进行了简单总结, 不是很全面, 包括涉及的socket和多线程相关的知识, 后续慢慢整理出来, 有什么理解错误的地方, 欢迎大家拍砖
为了方便运行, 示例代码均在一个函数内, 使用kotlin语言编写