参考链接:
[1] java aio 编程
[2] java AIO 服务端代码实现
主要内容:
实现服务端、客户端异步多次通信。
服务端代码:
package com.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* java AIO 服务端代码实现
* https://blog.csdn.net/qq_29048719/article/details/81045258
*/
public class AIOServer {
public final static int PORT = 9888;
private AsynchronousServerSocketChannel server;
private AsynchronousSocketChannel lastClient; // 最后一次连接的客户端
public AIOServer() throws IOException {
server = AsynchronousServerSocketChannel.open().bind(
new InetSocketAddress(PORT));
}
/**
* Future方式
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public void startWithFuture() throws InterruptedException,
ExecutionException, TimeoutException {
while (true) {
// 循环接收客户端请求
Future<AsynchronousSocketChannel> future = server.accept();
AsynchronousSocketChannel socket = future.get();// get() 是为了确保 accept 到一个连接
handleWithFuture(socket);
}
}
public void handleWithFuture(AsynchronousSocketChannel channel) throws InterruptedException, ExecutionException, TimeoutException {
ByteBuffer readBuf = ByteBuffer.allocate(2);
readBuf.clear();
while (true) {
// 一次可能读不完
//get 是为了确保 read 完成,超时时间可以有效避免DOS攻击,如果客户端一直不发送数据,则进行超时处理
Integer integer = channel.read(readBuf).get(100000, TimeUnit.SECONDS);
System.out.println("read: " + integer);
if (integer == -1) {
break;
}
readBuf.flip();
System.out.println("received: " + Charset.forName("UTF-8").decode(readBuf));
readBuf.clear();
}
}
/**
* Callback方式
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public void startWithCompletionHandler() throws InterruptedException,
ExecutionException, TimeoutException {
server.accept(null,
new CompletionHandler<AsynchronousSocketChannel, Object>() {
public void completed(AsynchronousSocketChannel result, Object attachment) {
server.accept(null, this);// 再次接收客户端连接
handleWithCompletionHandler(result);
lastClient = result;
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
}
public void handleWithCompletionHandler(final AsynchronousSocketChannel channel) {
try {
final ByteBuffer buffer = ByteBuffer.allocate(2048);
final long timeout = 30L;
channel.read(buffer, timeout, TimeUnit.MINUTES, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("read:" + result);
if (result == -1) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
return;
}
buffer.flip();
System.out.println("received message from client:" + Charset.forName("UTF-8").decode(buffer));
buffer.clear();
// 再次等待读取客户端消息
channel.read(buffer, timeout, TimeUnit.MINUTES, null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String args[]) throws Exception {
// new AIOServer().startWithFuture();
AIOServer aioServer = new AIOServer();
// 使用Callback方式
aioServer.startWithCompletionHandler();
Thread.sleep(20000);
aioServer.lastClient.write(ByteBuffer.wrap("服务端消息1".getBytes()));
Thread.sleep(1000);
aioServer.lastClient.write(ByteBuffer.wrap("服务端消息2".getBytes()));
Thread.sleep(100000);
}
}
客户端代码:
package com.aio;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class AIOClient {
public final static int PORT = 9888;
private AsynchronousSocketChannel asyncClient;
public AIOClient() throws IOException {
asyncClient = AsynchronousSocketChannel.open();
}
public void startWithFuture() throws InterruptedException, ExecutionException {
asyncClient.connect(new InetSocketAddress("localhost", PORT)).get();
handleWithCompletionHandler();
}
public void handleWithCompletionHandler() {
try {
final ByteBuffer buffer = ByteBuffer.allocate(2048);
asyncClient.read(buffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("read:" + result);
if (result == -1) {
return;
}
buffer.flip();
System.out.println("received message from server:" + Charset.forName("UTF-8").decode(buffer));
buffer.clear();
// 再次等待读取服务端消息
asyncClient.read(buffer, null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
// AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// client.connect(new InetSocketAddress("localhost", 9888)).get();
// client.write(ByteBuffer.wrap("123456789".getBytes()));
// Thread.sleep(1000);
// client.write(ByteBuffer.wrap("32165498".getBytes()));
// Thread.sleep(1111111);
AIOClient aioClient = new AIOClient();
aioClient.startWithFuture();
aioClient.asyncClient.write(ByteBuffer.wrap("123456789".getBytes()));
Thread.sleep(1000);
aioClient.asyncClient.write(ByteBuffer.wrap("32165498".getBytes()));
Thread.sleep(200000);
}
}
结果输出:
AIOServer
Connected to the target VM, address: '127.0.0.1:63937', transport: 'socket'
read:9
received message from client:123456789
read:8
received message from client:32165498
AIOClient
Connected to the target VM, address: '127.0.0.1:63946', transport: 'socket'
read:16
received message from server:服务端消息1
read:16
received message from server:服务端消息2