socket(五)–AIO异步非阻
文章目录
一、简介
jdk1.7起,推出了nio2.0,引入了异步通道,即异步非阻塞AIO。这里对AIO进行介绍。
二、关键类及其方法
2.1 java.nio.channels.CompletionHandler
当接收连接accept、发起连接connect、读就绪read、写就绪write操作时的处理器,这里一个泛型,泛型的第一参数是处理的结果,第二参数是附件。内有两个方法completed和failed,即完成和失败时处理。源码如下:
package java.nio.channels;
//消费异步I/O结果的处理器
public interface CompletionHandler<V,A> {
//成功时调用
void completed(V result, A attachment);
//失败时调用
void failed(Throwable exc, A attachment);
}
2.2 java.nio.channels.AsynchronousChannelGroup
创建异步处理的共享资源,可理解为线程,使用中也是创建线程池,如:
AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
2.3 java.nio.channels.AsynchronousServerSocketChannel
服务端的channel,通过静态方法open创建,如:
AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(group);
关键方法:
public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)
accept方法用于接收连接请求,第一个参数是附件,第二个参数是收到请求后的接收处理器。对应的处理器泛型,第一个参数是处理结果(这里为AsynchronousSocketChannel,即接收到的请求channel),第二个参数是附件。
2.4 java.nio.channels.AsynchronousSocketChannel
连接channel,服务端收到连接或者客户端发起连接时创建,用于数据的读写。关键方法:
- public abstract void connect(SocketAddress remote,A attachment,CompletionHandler<Void,? super A> handler)
连接服务器,第一个参数为连接的目标地址,第二参数为附件,第三个参数为连接处理器。
对应的处理器泛型,第一个参数为空(即Void),第二个参数为附件; - public final void read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler)
读取数据,第一个参数是数据读取到的目标缓存,第二个参数是附件,第三个参数是读取结束后的处理器。
对应的处理器泛型,第一个参数是读取的字节数,第二个是附件类型; - public final void write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler)
写入数据,第一个参数是数据写入的缓存,第二个参数是附件,第三个参数是写结束后的处理器。
对应的处理器泛型,第一个参数是写入的字节数,第二个是附件类型;
三、示例
3.1 服务端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AioServerMain {
static Charset charset = Charset.forName("utf-8");
public static void main(String[] args) throws Exception {
int port = 7001;
//启动服务端
new NioServer(port).start();
TimeUnit.MINUTES.sleep(30);
}
/
* server线程
*/
static class NioServer extends Thread {
int port;
AsynchronousChannelGroup group;
AsynchronousServerSocketChannel assc;
public NioServer(int port) {
try {
this.port = port;
//创建处理线程池
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
//创建服务channel
assc = AsynchronousServerSocketChannel.open(group);
InetSocketAddress address = new InetSocketAddress(port);
//绑定地下
assc.bind(address);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//接收请求
//accept的第一个参数是附件,第二个参数是收到请求后的接收处理器
//接收处理器AcceptHandler泛型的第一个参数是处理结果(这里为AsynchronousSocketChannel,即接收到的请求channel),第二个参数是附件(这里为NioServer,即其实例),
assc.accept(this, new AcceptHandler());
}
}
/
* 接收请求处理器
* CompletionHandler泛型的第一个参数是处理结果(这里为AsynchronousSocketChannel,即接收到的请求channel),第二个参数是附件(这里为NioServer,即其实例)
*/
static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, NioServer> {
@Override
public void completed(AsynchronousSocketChannel result, NioServer attachment) {
//继续接收下一个请求,构成循环
attachment.assc.accept(attachment, this);
try {
System.out.println("accept from:" + result.getRemoteAddress().toString());
//定义数据读取缓存
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//读取数据,并传入数据到达时的处理器,
//read的第一个参数是数据读取到的目标缓存,第二个参数是附件,第三个参数是读取结束后的处理器
//读取处理器泛型的第一个参数是读取的字节数,第二个是附件类型
result.read(readBuffer, readBuffer, new ReadHandler(result));
//新开线程发送数据
new WriteThread(result).start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, NioServer attachment) {
}
}
/
* 读数据处理器
* CompletionHandler泛型的第一个参数是读取的字节数,第二个是附件类型
*/
static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel asc;
public ReadHandler(AsynchronousSocketChannel asc) {
this.asc = asc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String readMsg = charset.decode(attachment).toString();
System.out.println("server receive msg:" + readMsg);
attachment.compact();
//继续接收数据,构成循环
asc.read(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
/
* 写数据线程
*/
static class WriteThread extends Thread {
private AsynchronousSocketChannel channel;
public WriteThread(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
//定义写缓冲
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
System.out.print("server send msg:");
String msg = scanner.nextLine();
writeBuffer.put(charset.encode(msg));
writeBuffer.flip();
//写入数据,并有写数据时的处理器,
//write的第一个参数是数据写入的缓存,第二个参数是附件,第三个参数是写结束后的处理器
//读取处理器泛型的第一个参数是写入的字节数,第二个是附件类型
channel.write(writeBuffer, writeBuffer, new WriteHandler(channel, scanner));
}
}
/
* 写数据处理器
*/
static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel channel;
Scanner scanner;
public WriteHandler(AsynchronousSocketChannel channel, Scanner scanner) {
this.channel = channel;
this.scanner = scanner;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.compact();
System.out.print("server send msg:");
String msg = scanner.nextLine();
attachment.put(charset.encode(msg));
attachment.flip();
//继续写数据,构成循环
channel.write(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
}
3.2 客户端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AioClientMain {
static Charset charset = Charset.forName("utf-8");
public static void main(String[] args) throws Exception {
int port = 7001;
String host = "127.0.0.1";
//启动客户端
new NioClient(port, host).start();
TimeUnit.MINUTES.sleep(30);
}
/
* client线程
*/
static class NioClient extends Thread {
int port;
String host;
AsynchronousChannelGroup group;
AsynchronousSocketChannel asc;
InetSocketAddress address;
public NioClient(int port, String host) {
try {
this.port = port;
this.host = host;
//创建处理线程组
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
//创建客户端channel
asc = AsynchronousSocketChannel.open(group);
address = new InetSocketAddress(host, port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//接收请求,并传入收到请求后的处理器
//connect方法的第一个参数为连接的目标地址,第二参数为附件,第三个参数为连接处理器
//连接处理器泛型的第一个参数为空(即Void),第二个参数为附件
asc.connect(address, asc, new ConnectHandler());
}
}
/
* 连接服务处理器
* 连接处理器CompletionHandler泛型的第一个参数为空(即Void),第二个参数为附件
*/
static class ConnectHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {
@Override
public void completed(Void result, AsynchronousSocketChannel attachment) {
try {
System.out.println("connect server:" + attachment.getRemoteAddress().toString());
//定义数据读取缓存
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//读取数据,并传入数据到达时的处理器
attachment.read(readBuffer, readBuffer, new ReadHandler(attachment));
//新开线程发送数据
new WriteThread(attachment).start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
}
}
/
* 读数据处理器
*/
static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel asc;
public ReadHandler(AsynchronousSocketChannel asc) {
this.asc = asc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String readMsg = charset.decode(attachment).toString();
System.out.println("client receive msg:" + readMsg);
attachment.compact();
//继续接收数据,构成循环
asc.read(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
/
* 写数据线程
*/
static class WriteThread extends Thread {
private AsynchronousSocketChannel channel;
public WriteThread(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
//定义写缓冲
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
System.out.print("client send msg:");
String msg = scanner.nextLine();
writeBuffer.put(charset.encode(msg));
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new WriteHandler(channel, scanner));
}
}
/
* 写数据处理器
*/
static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
AsynchronousSocketChannel channel;
Scanner scanner;
public WriteHandler(AsynchronousSocketChannel channel, Scanner scanner) {
this.channel = channel;
this.scanner = scanner;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.compact();
System.out.print("client send msg:");
String msg = scanner.nextLine();
attachment.put(charset.encode(msg));
attachment.flip();
//继续写数据,构成循环
channel.write(attachment, attachment, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
}