package com.nio.aio.client;
import java.io.IOException;
public class AIOClient {
public static void main(String[] args)throws IOException {
// 设置要监听的端口
int port = 8788;
if (args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
// 创建AIOTimeClientHandler线程来处理异步连接和读写操作
new Thread(new AIOTimeClientHandler("localhost", port), "AIO-client-Handler").start();
}
}
package com.nio.aio.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AIOTimeClientHandler implements CompletionHandler<Void, AIOTimeClientHandler>, Runnable {
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
// 构造方法
public AIOTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
// 创建AsynchronousSocketChannel对象
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
// 创建CountDownLatch进行等待,防止异步操作没有执行完成线程就退出
latch = new CountDownLatch(1);
// 发起异步操作
/**
* 参数说明
* 1 该通道要连接到的远程地址
* 2 用于回调通知时作为入参被传递
* 3 异步操作回调通知接口
*/
client.connect(new InetSocketAddress(host,port), this, this);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AIOTimeClientHandler attachment) {
// 创建发给服务端的消息
byte[] req = "AIO TIME TEST".getBytes();
// 预先分配一个1MB的缓冲区
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
// 将字节数组中的数据复制到缓冲区中
writeBuffer.put(req);
// 进行flip操作,为后续从缓冲区读取数据做准备
writeBuffer.flip();
// 进行异步写
// 第三个参数用于写操作完成后的回调
client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果发送缓冲区中仍有尚未发送的字节,将继续异步发送
if (attachment.hasRemaining()){
client.write(attachment, attachment, this);
}else {
// 发送完成,进行异步的读取操作
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 异步读取服务端的响应消息,该操作是异步操作
client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 进行flip操作,为后续从缓冲区读取数据做准备
attachment.flip();
// 根据缓冲区可读的字节数创建字节数组
byte[] bytes = new byte[attachment.remaining()];
// 将缓冲区的可读的字节数组复制到新创建的字节数组中
attachment.get(bytes);
String body;
try {
// 将读取到的数据进行转码并打印
body = new String(bytes, "utf-8");
System.out.println("服务端返回的响应时间:" + body);
// 如果当前计数大于零,则其递减。如果新的计数为零,那么所有等待的线程都将重新启用,以用于线程调度目的。
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* 读取发生异常,关闭链路,同时调用countDown方法,让AIOTimeServerHandler
* 执行完毕,客户端退出执行
*/
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, AIOTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}