本文承接《 NIO 理论 与 编程》
AIO 理论简介
NIO 2.0 引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供了以下两种方式获取操作结果。
1)通过 java.util.concurrent.Future 类来实现异步操作的结果
2)在执行异步操作的时候传入一个 java.nio.channels,CompletionHandler 接口的实现类作为操作完成的回调。
NIO 2.0 的异步套接字通道是真正的异步非阻塞 I/O,对应与 UNIX 网络编程中的事件驱动 I/O(AIO)。它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了 NIO 的编程模型。
AIO 编程
本文仍然以一个简单的通信例子讲解服务端与客户端的编码流程,先开启服务器,在开启客户端,客户端开启后自动发送一条消息给服务器,然后服务器收到消息后返回一条消息。
服务端
TimeServer
package com.lct.aio;
import java.util.concurrent.CountDownLatch;
/**
* Created by Administrator on 2018/10/27 0027.
* 时间服务器
*/
public class TimeServer {
public static void main(String[] args) {
/**新开一个线程处理 AIO 服务器通信
* 实际项目中视实际情况决定是否开子线程*/
AsyncTimeServerHandler asyncTimeServerHandler = new AsyncTimeServerHandler();
new Thread(asyncTimeServerHandler).start();
/**AIO 因为是异步非阻塞,所以并不会像 BIO 一样进行阻塞的进行监听,也不会像 NIO 一样使用多路复用器
* 轮询准备就绪的管道。所以为了防止程序退出,这里使用倒计数锁存器让 主线程一直阻塞,应用一直运行*/
CountDownLatch latch = new CountDownLatch(1);
try {
System.out.println(Thread.currentThread().getName() + " 线程:主线程开始阻塞,等待 AIO 服务器通信.....");
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
AsyncTimeServerHandler
package com.lct.aio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
/**
* Created by Administrator on 2018/10/27 0027.
*/
public class AsyncTimeServerHandler implements Runnable {
/**
* AsynchronousServerSocketChannel:异步服务器通道
*/
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler() {
try {
/**
* 创建异步服务器通道
* 绑定端口
*/
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
SocketAddress inetAddress = new InetSocketAddress(8080);
asynchronousServerSocketChannel.bind(inetAddress);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程:AIO 服务器初始化完成...");
/**
* <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler)
* 异步服务器套接字通道的 accept 方法用于接受客户端连接
* 因为是异步操作,可以传递一个自定义的 CompletionHandler<AsynchronousSocketChannel,? super A> 实例
* 将来客户端连接成功时,会在 AcceptCompletionHandler 中进行处理
*/
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
}
}
AcceptCompletionHandler
package com.lct.aio;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* Created by Administrator on 2018/10/27 0027.
* 自定义 CompletionHandler 实例作为 handler 来接收通知消息
* CompletionHandler<V,A> 接口一共两个方法,要注意相互之间的反射参数
*/
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
/**
* 客户端连接成功时,自动进入此方法
*
* @param result
* @param attachment
*/
@Override
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
System.out.println(Thread.currentThread().getName() + " 线程:客户端连接成功...");
/**
* 从 attachment 获取成员变量 AsynchronousServerSocketChannel,然后继续调用它的 accept 方法接收客户端
* 调用 AsynchronousServerSocketChannel 的 accept 方法后,如果有新的客户端连接接入,系统将回调传入的 CompletionHandler 实例-
* 的 completed 方法,表示新的客户端连接成功。
* 因为一个 AsynchronousServerSocketChannel 可以接收成千上万个客户端,所以需要继续调用它的 accept 方法接收其它客户端的连接,最终形成一个循环。
* 每当接收一个客户端连接成功之后,再异步接收新的客户端连接。
*/
attachment.asynchronousServerSocketChannel.accept(attachment, this);
/**
* read(ByteBuffer dst,A attachment,CompletionHandler<Integer,? super A> handler)
* 从这个通道读取字节序列到给定缓冲区,启动异步读取操作
* 预分配 1 M 的字节缓冲数组,调用 AsynchronousSocketChannel 进行异步读取操作
* dst:接收缓冲区,用于从异步 Channel 中读取数据包
* attachment:异步 Channel 携带的附件,通知回调的时候作为参数使用
* handler:接收通知回调的业务 Handler
* 这里为了读取数据更加清晰,又新建一个类进行处理,实际视需求而定,可以没必要拆分这么细
*/
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
}
}
ReadCompletionHandler
package com.lct.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* Created by Administrator on 2018/10/27 0027.
* 对客户端发送来的消息进行读取以及回复消息
*/
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
/**
* 将 AsynchronousSocketChannel 作为成员变量传入,用于读取和发送消息
*/
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null) {
this.channel = channel;
}
}
/**
* 客户端发送消息过来时,自动进入此方法
*
* @param result
* @param attachment
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
/**
* 反转此缓冲区,为后续从缓冲区读取数据做准备
* 根据缓冲区的可读字节数创建字节数组,然后将字节转为字符串,指定编码
*/
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println(Thread.currentThread().getName() + " 线程:接收到客户端消息:" + req);
doWrite("服务器已经接收到消息\"" + req + "\"");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
if (this.channel != null) {
this.channel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 给客户端返回消息
*
* @param meg
*/
private void doWrite(String meg) {
byte[] bytes = meg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
/**
* write 异步写方法,与 read 一样有三个同样的参数
*/
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
/**如果没有发送完成,则继续发送*/
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
客户端
TimeClient
package com.lct.aio;
import java.util.concurrent.CountDownLatch;
/**
* Created by Administrator on 2018/10/27 0027.
* 时间客户端
*/
public class TimeClient {
public static void main(String[] args) {
/**
* 开3个线程,模拟三个客户端通信
*/
for (int i=0;i<3;i++){
AsyncTimeClientHandler asyncTimeClientHandler = new AsyncTimeClientHandler();
new Thread(asyncTimeClientHandler).start();
}
}
}
AsyncTimeClientHandler
package com.lct.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
/**
* Created by Administrator on 2018/10/27 0027.
* 时间客户端处理器
*/
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
/**
* AsynchronousSocketChannel:异步套接字管道
* latch:为了防止通信的子线程在业务没有完成之前关闭 client,所以使用倒计数锁存器
* 当给服务器发送消息,然后接收服务器回复后,关闭 client,同时线程结束
*/
private AsynchronousSocketChannel client;
private CountDownLatch latch;
public AsyncTimeClientHandler() {
try {
/**创建异步套接字管道*/
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
/**
* 发起异步连接操作
* connect(SocketAddress remote,A attachment,CompletionHandler<Void,? super A> handler)
* remote:服务器地址
* attachment:AsynchronousSocketChannel 的附件,用于回调通知时作为入参被传递,可以自定义
* handler:异步操作回调通知接口
*/
SocketAddress socketAddress = new InetSocketAddress("192.168.1.20", 8080);
client.connect(socketAddress, this, this);
/**在这里让通信线程强制阻塞,否则因为是异步的原因会导致线程提前结束*/
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
/**业务完成后,关闭通信管道,线程退出*/
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 异步连接成功时,自动进入此方法
*
* @param result
* @param attachment
*/
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
/**
* 异步发送消息
* 与服务器完全类似,如果缓冲区数据未发生完毕,则继续异步发送
* 如果发送完成,则执行异步读取操作
*/
String message = "我是客户端 " + Thread.currentThread().getName();
final byte[] bytes = message.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, final ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
/**当消息发送完毕后,然后在这里进行异步读取服务器返回的消息*/
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, readBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer byteBuffer) {
byteBuffer.flip();
byte[] bytes1 = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes1);
try {
String body = new String(bytes1, "UTF-8");
System.out.println("收到服务器回复:" + body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试结果
先启动服务器,然后启动客户端,控制台信息如下:
注意事项:示例中并没有完整的处理网络的半包读写,虽然在普通的环境下没有问题,但是如果进行压力或者性能测试,就会发现结果并不是预期结果。
对于半包读写之后会在详细介绍。