在java中,支持异步模型的方式有两个类:
- Future类
- Callable接口
严格来说,Future不能算是异步模型的类,因为future.get()方法是阻塞的,需要等待处理完成;而Callable是回调,是正宗的异步模型工具。
前言介绍
在Java中,提供了一些关于使用IO的API,可以供开发者来读写外部数据和文件,我们称这些API为Java IO。IO是Java中比较重要知识点,且比较难学习的知识点。并且随着Java的发展为提供更好的数据传输性能,目前有三种IO共存;分别是BIO、NIO和AIO。
这是主要记录一下AIO的代码实现
基于Future
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @description:
* @create: 2021-08-26 11:18
**/
public class BaseOnFuture {
@Test
public void AsynchronousServerSocketChannelServer() {
try {
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
channel.bind(new InetSocketAddress(8888));
while (true) {
Future<AsynchronousSocketChannel> conn = channel.accept();
// 阻塞等待直到future有结果
AsynchronousSocketChannel asyncSocketChannel = conn.get();
// 异步处理连接
asyncHandle(asyncSocketChannel);
}
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 基于future 实际上是同步的读取方式
private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
ByteBuffer dst = ByteBuffer.allocate(1024);
// based on Future,
// 实际上是同步处理的方式,为了不将处理变成阻塞式单连接的socket形式,使用子线程来获取输入流
new Thread(() -> {
while (asyncSocketChannel.isOpen()) {
Future<Integer> readFuture = asyncSocketChannel.read(dst);
try {
// 阻塞等待读取结果
Integer readResult = readFuture.get();
if (readResult > 0) {
System.out.println("收到来自客户端的信息:\n"+new String(dst.array(), StandardCharsets.UTF_8));
dst.clear();
} else {
// doOtherthing
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}).start();
}
// Future
@Test
public void TestAsyncSocketChannelClient() {
while(true){
System.out.println("你想对服务器说:");
Scanner s = new Scanner(System.in);
String line = s.nextLine();
AsynchronousSocketChannel asc;
try {
asc = AsynchronousSocketChannel.open();
Future<Void> connResult =
asc.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
// 等待连接成功。连接成功会返回null;
connResult.get();
// 读写操作
ByteBuffer buf = ByteBuffer.wrap(line.getBytes());
asc.write(buf).get();
// TODO
asc.close();
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
使用流程:
1.先运行AsynchronousServerSocketChannelServer服务端,这时候该线程会一直阻塞等待连接
一直运行中,不用管
2.启动客户端,并在terminal终端,输入你想要发送给服务端的信息
扫描二维码关注公众号,回复:
16481174 查看本文章
3.查看服务端是否收到信息
说明成功了
基于回调(callback)
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @create: 2021-08-26 11:19
**/
public class BaseOnCallBack {
@Test
public void AsynchronousServerSocketChannelCallbackServer() {
try {
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
channel.bind(new InetSocketAddress(8888));
channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
System.out.println("accept completed");
// 异步处理连接
asyncHandle(result);
// 继续监听accept
channel.accept(null, this);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("accept failed");
}
});
// 让主线程保持存活
while (true) {
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 基于回调
private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
ByteBuffer dst = ByteBuffer.allocate(1024);
asyncSocketChannel.read(dst, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
if (result > 0) {
System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
dst.clear();
}
// 注册回调,继续读取输入
asyncSocketChannel.read(dst, null, this);
}
@Override
public void failed(Throwable exc, Void attachment) {
// TODO Auto-generated method stub
}
});
}
// callback
@Test
public void TestAsyncSocketChannelCallbackClient() {
while(true){
System.out.println("你想对服务器说:");
Scanner s = new Scanner(System.in);
String line = s.nextLine();
CountDownLatch latch = new CountDownLatch(1);
AsynchronousSocketChannel asc;
try {
asc = AsynchronousSocketChannel.open();
asc.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888), null,
new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
System.out.println("connect complete;");
// 写操作
ByteBuffer buf = ByteBuffer.wrap(line.getBytes());
asc.write(buf, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("write completed,close channnl");
try {
latch.countDown();
asc.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
try {
asc.close();
latch.countDown();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
// TODO Auto-generated method stub
}
});
// 等待回调结束
latch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
基于回调的调用流程同上!
下一篇:java NIO 服务端和客户端相互通信