AIO,异步非阻塞IO,是JDK1.7引入的,在nio包下,相比NIO,多了4个异步通道类,AsynchronousSocketChannel,AsynchronousServerSocketChannel,AsynchronousFileChannel,AsynchronousDatagramChannel。和NIO类似,都是使用ByteBuffer来缓存数据,Channel来传输数据,不过它的读写方法是异步操作的。
在AIO中,主要是通过一个CompletionHandler的实现类来进行业务处理操作的,重写completed方法,在该方法中,链式的调用accept方法,进行下一次的获取客户端请求,接着再书写业务逻辑。
大致的时序图如下:
服务端代码:
public class MyServer {
public static void main(String[] args) {
new MyServer(8888);
}
private AsynchronousServerSocketChannel serverChannel;
public AsynchronousServerSocketChannel getServerChannel() {
return this.serverChannel;
}
public MyServer(int port) {
try {
init(port);
} catch (Exception e) {
e.printStackTrace();
}
}
public void init(int port) throws Exception {
//开启serverChannel
this.serverChannel = AsynchronousServerSocketChannel.open();
//绑定端口,开启服务
this.serverChannel.bind(new InetSocketAddress(port));
System.out.println("server start...");
//启动监听
this.serverChannel.accept(this, new MyServerHandler());
System.out.println("server listen port:" + port);
while(true) {
//休眠的作用是,保证前台有线程在执行
//AsynchronousServerSocketChannel的accept方法为守护线程的方式运行,所以需要一个前台线程显示的执行
Thread.sleep(Long.MAX_VALUE);
}
}
}
class MyServerHandler implements CompletionHandler<AsynchronousSocketChannel, MyServer>{
@Override
public void completed(AsynchronousSocketChannel channel, MyServer server) {
//处理下一次的客户端请求
server.getServerChannel().accept(server, this);
//业务逻辑,读取客户端消息
readClientMsg(channel);
}
@Override
public void failed(Throwable exc, MyServer attachment) {
exc.printStackTrace();
}
/**
* 读取客户端发来的信息
* @param channel
*/
public void readClientMsg(final AsynchronousSocketChannel channel) {
ByteBuffer buf = ByteBuffer.allocate(1024);
channel.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>(){
/**
* 业务逻辑
* 读取客户端的数据
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
//先flip再使用
attachment.flip();
try {
//获取客户端发来的信息
String clientMsg = new String(attachment.array(),"UTF-8");
System.out.println("client:" + clientMsg);
writeClientMsg(channel, clientMsg);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
/**
* 向客户端写信息
* @param channel
* @param clientMsg
* @throws UnsupportedEncodingException
*/
public void writeClientMsg(AsynchronousSocketChannel channel,String clientMsg) throws UnsupportedEncodingException {
ByteBuffer buf = ByteBuffer.allocate(1024);
String serverMsg = "";
clientMsg = clientMsg.trim();
if("0".equals(clientMsg)) {
serverMsg = "00000000";
}else if("1".equals(clientMsg)) {
serverMsg = "11111111";
}
else if("exit".equals(clientMsg)) {
serverMsg = "bye";
}
else {
serverMsg = "-1(不可识别) --> " + clientMsg;
}
buf.put(serverMsg.getBytes("UTF-8"));
//flip
buf.flip();
channel.write(buf);
}
}
客户端代码:
public class MyClient {
public static void main(String[] args) throws Exception {
MyClient client = new MyClient();
String host = "127.0.0.1";
int port = 8888;
Scanner scan = new Scanner(System.in);
String msg = "";
while(!"exit".equals(msg)) {
client.init(host, port);
System.out.print("client enter:");
msg = scan.nextLine();
client.writeMsg(msg);
client.readServerMsg();
}
}
private AsynchronousSocketChannel channel;
/**
* 初始化
* @param host
* @param port
* @throws Exception
*/
public void init(String host,int port) throws Exception {
channel = AsynchronousSocketChannel.open();
channel.connect(new InetSocketAddress(host, port));
}
/**
* 读取服务端消息
* @throws Exception
*/
public void readServerMsg() throws Exception {
ByteBuffer buf = ByteBuffer.allocate(1024);
//读取服务端的消息
//再调用get方法,阻塞一下,将消息完全读出来后再执行后面的语句
channel.read(buf).get();
//flip,读取数据
buf.flip();
System.out.println("server:" + new String(buf.array(),"UTF-8"));
}
public void writeMsg(String msg) throws UnsupportedEncodingException {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(msg.getBytes("UTF-8"));
//放进通道前,flip
buf.flip();
channel.write(buf);
}
}