java BIO NIO AIO总结
java io操作是编程人员经常使用到的,以前只是使用没有对这三种IO做系统的了解,本文将对这三种IO作详细的介绍并附有测试完整代码
同步与异步,阻塞与非阻塞
首先,了解一下 同步与异步,阻塞与非阻塞的概念
同步和异步
同步和异步是对通信机制而言的,重点在客户端的感知,同步就是指在请求者未得到应答时一直等待下去,直到回应后处理结束。异步是指请求者发送请求后,直接返回,暂时不关心结果,直接处理其它业务,此时服务接收者通过状态或回调方式通知请求者结果。
举个栗子:
小明同学上午去新华书店买一本高考黄岗数学试题,店内人员告知暂时没有这本书,要等到下午4点以后新书才会到,这里小明有两个选择:
1。(同步)小明就在新华书店坐等新书的到来,下午4点后买到书直接回家
2。(异步)小明知道了下午4点后新书才会到,所以就直接回家了,回到家后吃饭睡觉,打游戏,当然还有学习(一本正经的微笑),到下午4点再去书店买。
这个例子中:小明即是请求者,新华书店就是服务端,第一种小明坐等就是等待应答(买到书),而第二种方式到新华书店得知下午4点才书才会到这个就是状态,回家做其它事就是处理其它业务,下午4点去取书就是通过之前了解到的一个状态采取的行动。当然,如果书店有送货服务那就是回调的方式送货到家,小明签收。阻塞和非阻塞
阻塞和非阻塞是对消息返回的时刻而言的,重点在服务端的处理过程。阻塞就是在当前线程处理完这一件事情时不会处理其它,直到此线程处理完成返回(线程挂起)。非阻塞是不等待结果,只发出处理这件事情的动作,结果通过其它方式返回(线程不挂起)可以处理其它事情。
还是上面的例子:店内人员得知小明要买高考黄岗数学试题,而店内没有这本书,此时店内人员什么都不做等着这本书的到来交给小明(阻塞线程挂起)。而如果这位工作人员比较敬业的话,小明需要的书没有,就暂时放在一边(此时运送此书的人仍然在加急处理),推荐其它高考模拟题并顺带推荐介绍其它各种产品(非阻塞)
一、JAVA BIO概述
java BIO也称传统的IO实现方式(JDK1.4之前的方式),是同步阻塞式IO。原理很简单,服务端启动监听客户端连接并处理发送的数据,完成后返回相应的数据,主要有两个概念:
● ServerSocket 服务端连接,负责绑定IP和端口,监听客户端发送过来的信息
● Socket 客户端连接,负责连接服务端并发送数据,接收应答数据
所以一般情况下这种同步阻塞IO的编程方式是:一个独立线程负责启动服务端ServerSocket 连接,在接收到客户端发送的信息后,单独new 线程处理每一个客户(当然这里也可以使用线程池,用固定线程数处理客户请求。
● 优点:编程简单,逻辑清楚
● 缺点:每个客户端都是一个单独的线程处理,增加了系统线程间切换的开销,服务端线程数等于客户端,这样对于高并发,需要应对大量客户请求是不适应的。
二、JAVA NIO概述
从JDK1.4以后引入NIO(Non-block I/O)的概念,属于同步非阻塞IO,其目的是提高处理效率,可以用于高并发,高负载的网络开发应用,相应的主要有以下三个概念:
● channels 通道或管道也可以理解为数据流,用于读写入数据,而且是双向通道
● Buffer 缓冲区用于存储具体的数据,channel读数据或写数据都是读到或写到缓冲区,NIO对数据的操作都是通过此缓冲区来操作的。缓冲区实际上是一数组,并提供了不同的数据结构处理读写信息:ByteBuffer、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer
● Selector选择器也叫多路复用器,是NIO的基础,Selector会不断的轮询注册到其上的channel(调用select()方法进行轮询,此方法阻塞直到有事件就绪),如果某个通道发生了读写事件,则此通道就属于就绪状态,就会被作为SelecttionKey轮询出来,通过SelecttionKey即可获取就绪的channel然后进行下应的读写处理。
● SelecttionKey,包含通道信息,并提供监听感兴趣的事件:
○ OP_ACCEPT接受请求事件
○ OP_CONNECT客户端请求一个连接事件
○ OP_READ读事件
○ OP_WRITE写事件
常用的channels:
● SelectableChannel用于网络读写
○ ServerSocketChannel 服务端,对应BIO的ServerSocket
○ SocketChannel 客户端,对应BIO的Socket
○ DatagramChannel UDP读写通道
● FileChannel 用于文件读写操作(注此channel只能设置为阻塞方式)
其设置思想是 服务端开启一个服务通道绑定端口号并设置为非阻塞模式,然后以监听接受事件注册到多路复用器,此时循环开始多路复用器轮询模式直到有客户端连接就绪,转为注册读事件读取客户端发送的内容处理完成后通过相应的Buffer写回通道,对应的客户端也开启相同的轮询模式取回返回的信息。
需要注意的是SocketChannel 写回信息时也是异步非阻塞的,所以不能保证一次把数据写完全,需要注册写操作,不断的轮询直到Buffer.hasRemaining()返回false。
三JAVA AIO概述
AIO也称为(NIO2.0),从JDK1.7开始引入的新异步通道概念,他不需要Selector过多的对通道事件轮询即可实现异步读写,简化了NIO的复杂度。其特点也与NIO类似,按上面的例子说就是调用了回调函数把书送到小明家里,小明只要签收即可不需要再过多(轮询)的关注。
主要包含以下三个概念:
● AsynchronousServerSocketChannel 对应NIO中的ServerSocketChannel
● AsynchronousSocketChannel对应NIO中的SocketChannel
● CompletionHandler回调接口,所使用的回调函数必须实现此接口,在socket进行accept/connect/read/write等操作就绪时回调。另外还可以使用Future的方式设置回调。
● AsynchronousChannelGroup其内部其实是一个(一些)线程池来进行实质工作的;而他们干的活就是等待IO事件,处理数据,分发各个注册的CompletionHandler。与之前的Selector的工作类似。
原理上与NIO类似(除了轮询),只是编码方式不同。不同的是数据处理交由系统完成,我们需要做的就是在CompletionHandler中处理业务逻辑。
四、BIO、NIO、AIO适用场景分析:
- BIO
BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。 - NIO
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。 - AIO
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
另外,I/O属于底层操作,需要操作系统支持,并发也需要操作系统的支持,所以性能方面不同操作系统差异会比较明显。
题外话:CountDownLatch(闭锁)
在使用AIO开发时使用了这个CountDownLatch,了解一下,是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它们都存在于java.util.concurrent包下。CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
2。Pipe
广义上讲,管道就是一个用来在两个实体之间单向传输数据的导管。管道的概念对于Unix(和类Unix)操作系统的用户来说早就很熟悉了。Unix系统中,管道被用来连接一个进程的输出和另一个进程的输入。Pipe类实现一个管道范例,不过它所创建的管道是进程内(在Java虚拟机进程内部)而非进程间使用的。
Pipe类定义了两个嵌套的通道类来实现管路。这两个类是Pipe.SourceChannel(管道负责读的一端)和Pipe.SinkChannel(管道负责写的一端)。这两个通道实例是在Pipe对象创建的同时被创建的,可以通过在Pipe对象上分别调用source( )和sink( )方法来取回。
管道可以被用来仅在同一个Java虚拟机内部传输数据。虽然有更加有效率的方式来在线程之间传输数据,但是使用管道的好处在于封装性。生产者线程和用户线程都能被写道通用的Channel API中。根据给定的通道类型,相同的代码可以被用来写数据到一个文件、socket或管道。选择器可以被用来检查管道上的数据可用性,如同在socket通道上使用那样地简单。这样就可以允许单个用户线程使用一个Selector来从多个通道有效地收集数据,并可任意结合网络连接或本地工作线程使用。因此,这些对于可伸缩性、冗余度以及可复用性来说无疑都是意义重大的。
Pipes的另一个有用之处是可以用来辅助测试。一个单元测试框架可以将某个待测试的类连接到管道的“写”端并检查管道的“读”端出来的数据。它也可以将被测试的类置于通道的“读”端并将受控的测试数据写进其中。两种场景对于回归测试都是很有帮助的。
管路所能承载的数据量是依赖实现的(implementation-dependent)。唯一可保证的是写到SinkChannel中的字节都能按照同样的顺序在SourceChannel上重现。
nio提供了一个通道工具类java.nio.channels.Channels。此类定义了支持java.io包中的流类与此包中的信道类之间进行互操作的静态方法。这些方法返回的包封Channel对象可能会也可能不会实现InterruptibleChannel接口,它们也可能不是从SelectableChannel引申而来。因此,可能无法将这些包封通道同java.nio.channels包中定义的其他通道类型交换使用。细节是依赖实现的。如果您的程序依赖这些语义,那么请使用操作器实例测试一下返回的通道对象。
参考:
http://blog.csdn.net/anxpp/article/details/51512200
http://www.importnew.com/15731.html
http://blog.csdn.net/will_awoke/article/details/26453085
测试代码
BIOServer:
package com.xps.test.io;
import com.xps.test.util.Calculator;
import com.xps.test.util.TestUtil;
import javax.script.ScriptException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* BIOServer 服务端
*/
public class BIOServer{
private static final String LOCK="lock";
//全局唯一
private static ServerSocket server;
private static boolean isContinue = true;
//使用线程池最大10个线程处理
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void start(){
synchronized (LOCK) {
if(server != null) {return;}
try {
//创建ServerSocket
server = new ServerSocket(TestUtil.DEFAULT_PORT);
TestUtil.log("==========服务器已经启动=========,监听端口:"+server.getLocalPort());
while (isContinue) {//在关闭之前一直循环处理
//接收客户端请求,阻塞
Socket socket = server.accept();
//单独启动一个线程处理
//new Thread(new ServerHandler(socket)).start();
//使用线程池处理客户端请求
executorService.execute(new ServerHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(server != null){server.close(); server = null;}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket){
this.socket = socket;
}
public void run() {
BufferedReader reader = null;
PrintWriter writer = null;
try {
//读取客户端信息
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//负责把结果写给客户端
writer = new PrintWriter(socket.getOutputStream(),true);
String expression = null;
while ((expression = reader.readLine())!=null) {
String result = Calculator.calc(expression).toString();
writer.println(result);
}
} catch (IOException e) {
e.printStackTrace();
} catch (ScriptException e) {
e.printStackTrace();
} finally {
try {
if(reader != null){reader.close(); }
if(writer != null){writer.close(); }
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
BIO客户端
package com.xps.test.io;
import com.xps.test.util.TestUtil;
import java.io.*;
import java.net.Socket;
/**
* Created by xiongps on 2017/8/30.
*/
public class BIOClient {
public static void sendMsg(String msg) {
TestUtil.log("----需要计算的表达式:"+msg);
Socket socket =null;
BufferedReader reader = null;
PrintWriter writer = null;
try {
//创建一个Socket客户端
socket = new Socket(TestUtil.DEFAULT_HOST,TestUtil.DEFAULT_PORT);
//把信息写入到socket
writer = new PrintWriter(socket.getOutputStream(),true);
writer.println(msg);
//接收服务端返回的信息
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
TestUtil.log("----计算结果:"+reader.readLine());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(reader != null){reader.close(); }
if(writer != null){writer.close(); }
if(socket != null){socket.close();}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
测试类:
package com.xps.test.xtest;
import com.xps.test.io.BIOClient;
import com.xps.test.io.BIOServer;
import com.xps.test.util.TestUtil;
import java.util.Random;
/**
* Created by xiongps on 2017/8/30.
*/
public class BIOServerTest {
public static void main(String []args){
try {
TestUtil.log("===开始运行服务器===");
startServer();
Thread.sleep(1000); //等待1秒
startClient();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void startServer(){
new Thread(new Runnable() {
public void run() {
BIOServer.start();
}
}).start();
}
private static void startClient(){
new Thread(new Runnable() {
public void run() {
while (true) {
char operators[] = {'+','-','*','/'};
Random random = new Random(System.currentTimeMillis());
StringBuilder expression = new StringBuilder(String.valueOf(random.nextInt(100)));
expression.append(operators[random.nextInt(4)])
.append(random.nextInt(100))
.append(operators[random.nextInt(4)])
.append(random.nextInt(100));
BIOClient.sendMsg("("+expression.toString()+") > "+random.nextInt(100));
try {
Thread.currentThread().sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
NIOServer
package com.xps.test.io;
import com.xps.test.util.Calculator;
import com.xps.test.util.TestUtil;
import javax.script.ScriptException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Created by xiongps on 2017/8/30.
*/
public class NIOServer {
private static ServerHandler serverHandler;
public static void start(){
if(serverHandler != null) {
serverHandler.stop();
}
serverHandler = new ServerHandler(TestUtil.DEFAULT_PORT);
new Thread(serverHandler,"NIOServer").start();
}
public static void stop(){
if(serverHandler != null) {
serverHandler.stop();
}
serverHandler = null;
}
public static class ServerHandler implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private boolean isStart = false;
public ServerHandler(int port) {
try {
//开启一个选择器
selector = Selector.open();
//开启一个通道
serverSocketChannel = ServerSocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverSocketChannel.configureBlocking(false);
//绑定端口 backlog设为1024
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//注册到选择器,监听客户端,服务器 感兴趣的事情当然是接收客户端的信息
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
isStart = true;
TestUtil.log("===服务器启动完成,开始监听并接收信息===端口为:"+port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void stop(){
isStart = false;
}
public void run() {
while (isStart) {
try {
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
try {
handlerKey(key);
} catch (ScriptException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
try {
if(selector != null) {selector.close(); selector = null;}
if(serverSocketChannel != null) {serverSocketChannel.close();serverSocketChannel = null;}
} catch (IOException e) {
e.printStackTrace();
}
}
private void handlerKey(SelectionKey key ) throws IOException, ScriptException {
if(!key.isValid()) {
return;
}
if(key.isAcceptable()) {//接受客户端的连接,并注册读事情
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = channel.accept();
if(socketChannel == null){return;}
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
}
//如果是读事情就绪,则处理读取客户端发送过来的内容
if(key.isReadable()) {
SocketChannel socketChannel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(buffer);
if(len > 0) {
//处理缓冲区
buffer.flip();
byte []bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String expression = new String(bytes,"UTF-8");
String result = Calculator.calc(expression).toString();
//得到结果后,把结果写回通道
byte []resultBytes =result.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(resultBytes.length);
writeBuffer.put(resultBytes);
writeBuffer.flip();
while (writeBuffer.hasRemaining()) {
socketChannel.write(writeBuffer);
}
} else if(len < 0) {
//链路关闭未读到信息
socketChannel.close();
key.cancel();
}
}
}
}
}
NIO客户端:
package com.xps.test.io;
import com.xps.test.util.TestUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* Created by xiongps on 2017/8/30.
*/
public class NIOClient {
private static ClientHandler clientHandler = null;
public static void start(){
if(clientHandler != null){
clientHandler.stop();
}
clientHandler = new ClientHandler(TestUtil.DEFAULT_HOST,TestUtil.DEFAULT_PORT);
new Thread(clientHandler,"client").start();
}
public static void stop(){
if(clientHandler != null){
clientHandler.stop();
}
clientHandler = null;
}
public static boolean send(String msg) {
if("exit".equals(msg)){
return false;
}
clientHandler.sendMsg(msg);
return true;
}
public static class ClientHandler implements Runnable {
private Selector selector;
private SocketChannel channel;
private String host;
private int port;
private boolean isStart = false;
public ClientHandler(String host,int port){
this.host = host;
this.port = port;
try {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
boolean isc = channel.connect(new InetSocketAddress(host,port));
if(!isc) {
channel.register(selector, SelectionKey.OP_CONNECT);
}
isStart = true;
} catch (IOException e) {
e.printStackTrace();
}
}
public void sendMsg(String msg) {
try {
channel.register(selector,SelectionKey.OP_READ);
byte []bytes = msg.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
public void stop(){
isStart = false;
}
public void run() {
try {
while (isStart) {
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
handlerKey(key);
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void handlerKey(SelectionKey key) throws IOException{
if(!key.isValid()){return;}
SocketChannel channel = (SocketChannel)key.channel();
if(key.isConnectable()) {
if(!channel.finishConnect()){
System.exit(1);
}
}
//只对读感兴趣
if(key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
if(len >0) {
buffer.flip();
byte []bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
TestUtil.log("客户端收到的结果:"+result);
}
}
}
}
}
测试类:
package com.xps.test.xtest;
import com.xps.test.io.NIOClient;
import com.xps.test.io.NIOServer;
import com.xps.test.util.TestUtil;
import java.util.Scanner;
/**
* Created by xiongps on 2017/8/30.
*/
public class NIOServerTest {
public static void main(String []args){
try {
TestUtil.log("===开始运行服务器===");
NIOServer.start();
Thread.sleep(1000); //等待1秒
NIOClient.start();
boolean bo =true;
while (bo){
bo = NIOClient.send(new Scanner(System.in).nextLine());
if(!bo){break;}
}
TestUtil.log("===停止运行客户端===");
NIOClient.stop();
TestUtil.log("===停止运行服务器===");
NIOServer.stop();
System.exit(0);//正常退出
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
AIOServer
package com.xps.test.io;
import com.xps.test.util.Calculator;
import com.xps.test.util.TestUtil;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.CountDownLatch;
/**
* Created by xiongps on 2017/8/31.
*/
public class AIOServer {
private static AsyncServerHandler asyncServerHandler;
public static void start(){
if(asyncServerHandler != null){return;}
asyncServerHandler = new AsyncServerHandler(TestUtil.DEFAULT_PORT);
new Thread(asyncServerHandler,"AIOServer").start();
}
//单独线程处理接收客户端发送的信息
public static class AsyncServerHandler implements Runnable {
private AsynchronousServerSocketChannel channel;
private CountDownLatch latch;
public AsyncServerHandler(int port) {
try {
//打开通道
channel = AsynchronousServerSocketChannel.open();
//绑定端口号
channel.bind(new InetSocketAddress(port),1024);
//在设定的线程启动完成之前,阻塞
latch = new CountDownLatch(1);
TestUtil.log("服务端启动");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
channel.accept(this,new AcceptHandler());
TestUtil.log("服务端开始等待");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class AcceptHandler implements
CompletionHandler<AsynchronousSocketChannel,AsyncServerHandler> {
public void completed(AsynchronousSocketChannel resultChannel, AsyncServerHandler asyncServerHandler) {
asyncServerHandler.channel.accept(asyncServerHandler,this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
resultChannel.read(buffer, buffer, new ReadServerHandler(resultChannel));
}
public void failed(Throwable exc, AsyncServerHandler attachment) {
}
public class ReadServerHandler implements CompletionHandler<Integer, ByteBuffer>{
private AsynchronousSocketChannel channel;
public ReadServerHandler(AsynchronousSocketChannel channel){
this.channel = channel;
}
public void completed(Integer result, ByteBuffer byteBuffer) {
byteBuffer.flip();
byte []bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
try {
String expression = new String(bytes,"UTF-8");
String ret = Calculator.calc(expression).toString();
byte []rBytes = ret.getBytes();
ByteBuffer wBuffer = ByteBuffer.allocate(rBytes.length);
wBuffer.put(rBytes);
wBuffer.flip();
channel.write(wBuffer, wBuffer, new CompletionHandler<Integer, ByteBuffer>() {
public void completed(Integer result, ByteBuffer wBuffer) {
if(wBuffer.hasRemaining()) {
channel.write(wBuffer,wBuffer,this);
} else {
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer,buffer,new ReadServerHandler(channel));
}
}
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (ScriptException e) {
e.printStackTrace();
}
}
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
}
}
}
客户端类:
package com.xps.test.io;
import com.xps.test.util.TestUtil;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
/**
* Created by xiongps on 2017/8/31.
*/
public class AIOClient {
private static AysncClientHandler aysncClientHandler;
public static void start(){
if(aysncClientHandler != null) {
return;
}
aysncClientHandler = new AysncClientHandler(TestUtil.DEFAULT_HOST,TestUtil.DEFAULT_PORT);
new Thread(aysncClientHandler,"AIOClient").start();
}
public static boolean send(String msg){
if("a".equals(msg)){return false;}
aysncClientHandler.send(msg);
return true;
}
public static class AysncClientHandler implements CompletionHandler<Void,AysncClientHandler>,Runnable {
private AsynchronousSocketChannel channel;
private String host ;
private int port;
private CountDownLatch latch;
public AysncClientHandler(String host,int port){
this.port = port;
this.host = host;
try {
this.channel = AsynchronousSocketChannel.open();
latch = new CountDownLatch(1);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
channel.connect(new InetSocketAddress(host,port),this,this);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void completed(Void result, AysncClientHandler attachment) {
}
public void failed(Throwable exc, AysncClientHandler attachment) {
}
public boolean send(String msg) {
byte []bytes = msg.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(bytes);
buffer.flip();
channel.write(buffer,buffer,new WriteHandler(channel,latch));
return true;
}
public class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {
private AsynchronousSocketChannel channel;
private CountDownLatch latch;
public WriteHandler(AsynchronousSocketChannel channel,CountDownLatch latch) {
this.channel = channel;
this.latch = latch;
}
public void completed(Integer result, ByteBuffer attachment) {
if(attachment.hasRemaining()){
channel.write(attachment,attachment,this);
} else {
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer,buffer,new ReadHandler(channel,latch));
}
}
public void failed(Throwable exc, ByteBuffer attachment) {
}
public class ReadHandler implements CompletionHandler<Integer,ByteBuffer> {
private AsynchronousSocketChannel channel;
private CountDownLatch latch;
public ReadHandler(AsynchronousSocketChannel channel,CountDownLatch latch) {
this.channel = channel;
this.latch = latch;
}
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte []bytes = new byte[buffer.remaining()];
buffer.get(bytes);
try {
String ret = new String(bytes,"UTF-8");
TestUtil.log("客户端接收到的结果:"+ret);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("数据读取失败...");
try {
channel.close();
latch.countDown();
} catch (IOException e) {
}
}
}
}
}
}
测试类:
package com.xps.test.xtest;
import com.xps.test.io.AIOClient;
import com.xps.test.io.AIOServer;
import com.xps.test.io.NIOClient;
import com.xps.test.io.NIOServer;
import com.xps.test.util.TestUtil;
import java.util.Scanner;
/**
* Created by xiongps on 2017/8/30.
*/
public class AIOServerTest {
public static void main(String []args){
try {
TestUtil.log("===开始运行服务器===");
AIOServer.start();
Thread.sleep(1000); //等待1秒
AIOClient.start();
boolean bo =true;
while (bo){
bo = AIOClient.send(new Scanner(System.in).nextLine());
if(!bo){break;}
}
TestUtil.log("===停止运行客户端===");
NIOClient.stop();
TestUtil.log("===停止运行服务器===");
NIOServer.stop();
System.exit(0);//正常退出
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}