系统间通信(六)---优化篇(5)

    上一篇文章我们讲了“多路复用IO”模型,即java中的NIO,但是它仍旧是一种同步IO模型。
    多路复用IO中有三个重要的概念,Channel,Selector,Buffer。记不清楚的同学可以回头再看看上一篇文章。
    本篇文章我们要讲的是JDK升级以后的NIO,即真正的异步IO,也就是AIO。

1.AIO和多路复用IO模型最大的区别是什么呢?

    多路复用IO主要是为一个Channel注册一个Selector和它关心的事件,那么应用程序就可以不用主动询问操作系统它关心的事件什么时候发生,由Selector代替应用程序去询问操作系统,这个时候应用程序就可以做其他的处理。可以看出,多路复用IO采用的是一种“轮询”的机制。

    而AIO采用的是“订阅-通知”机制。怎么理解呢?AIO中也有Channel和Buffer的概念,但是却没有了Selector。AIO中向通道注册关心的事件以后,可以理解为向操作系统订阅了该事件,应用程序这时可以做其他的处理。而操作系统在通道注册的事件发生的时候,会主动通知应用程序。

2.Java对AIO的支持:

    

注意:AsynchronousServerSocketChannel实现了NetworkChannel,而AsynchronousSocketChannel不止实现了NetworkChannel,还实现了AsynchronousByteChannel,AsynchronousByteChannel为AsynchronousSocketChannel提供了Buffer的支持。

3.代码实例:

服务端代码:

    

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class MyTest {
    
    private static final Object waitObject = new Object();

    public static void main(String[] args) {
        
        try {
            //创建AsynchronousServerSocketChannel
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
            
            //为通道绑定地址
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",83));
            
            //为AsynchronousServerSocketChannel注册监听(accept事件)
            serverSocketChannel.accept(null, new ServerSocketChannelHandler(serverSocketChannel));
            
            //模拟等待,不然没有客户端连接会直接执行完毕
            synchronized(waitObject) {
                waitObject.wait();
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }
    
}

/**
 *  响应类,专门用来处理AsynchronousServerSocketChannel的accept响应
 */
class ServerSocketChannelHandler implements CompletionHandler<AsynchronousSocketChannel, Void>{
    
    private AsynchronousServerSocketChannel serverSocketChannel;
    
    public ServerSocketChannelHandler(AsynchronousServerSocketChannel serverSocketChannel){
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void failed(Throwable exc, Void attachment) {
        System.out.println("失败");
    }

    @Override
    public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
        /**
         * 客户端每次请求的时候观察这三行输出
         * this指AsynchronousServerSocketChannel对象,会发现不管请求多少次,这个对象都是同一个
         * socketChannel指每个客户端的连接通道,请求N次,每次都是不同的
         * attachment
         */
        System.out.println("this :   "+this);
        System.out.println("socketChannel :   "+socketChannel);
        System.out.println("attachment :   "+attachment);
        
        //每次都要注册监听,一次请求,一次响应
        this.serverSocketChannel.accept(attachment, this);
        
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        
        //为AsynchronousSocketChannel注册read监听(read事件)
        socketChannel.read(buffer, new StringBuffer(), new SocketChannelHandler(socketChannel,buffer));
        
    }

}

/**
 * 响应类,专门用来处理AsynchronousSocketChannel的read响应
 */
class SocketChannelHandler implements CompletionHandler<Integer, StringBuffer>{
    
    private AsynchronousSocketChannel socketChannel;
    
    private ByteBuffer buffer;
    
    public SocketChannelHandler(AsynchronousSocketChannel socketChannel,ByteBuffer buffer) {
        this.socketChannel = socketChannel;
        this.buffer = buffer;
    }

    @Override
    public void completed(Integer result, StringBuffer attachment) {
        
        System.out.println("响应read事件");
        
        if(result == -1){
            try {
                System.out.println("客户端关闭了连接通道,终止服务");
                this.socketChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return ;
        }
        
        this.buffer.flip();
        byte[] context = new byte[1024];
        this.buffer.get(context, 0, result);
        this.buffer.clear();
        try {
            String nowContent = new String(context, 0, result, "UTF-8");
            attachment.append(nowContent);
            System.out.println("截止目前为止,收到客户端发送的数据为: "+attachment);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        
        //说明还没有读完客户端传输的数据,重新再去读一次
        if(attachment.indexOf("over")==-1){
            return ;
        }
        
        //*****************************************
        //  收到客户端发送数据以后,服务端真正的业务处理逻辑
        //*****************************************
        System.out.println("服务端处理客户端请求");
        
        //继续下次监听
        this.socketChannel.read(this.buffer, attachment, this);
        System.out.println("本次监听完毕,开始下次监听");
        
    }

    @Override
    public void failed(Throwable exc, StringBuffer attachment) {
        try {
            System.out.println("失败");
            this.socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
}

客户端代码:

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URLDecoder;

public class Client {

    public static void main(String[] args) {
        Socket socket = null;
        InputStream is = null;
        OutputStream os = null;
        try {
            //客户端请求建立连接
            socket = new Socket("localhost", 83);
            is = socket.getInputStream();
            os = socket.getOutputStream();
            byte[] buffer = new byte[1024];
            String msg = "来自客户端的请求over";
            //String msg = "来自客户端的请求";
            buffer = msg.getBytes();
            os.write(buffer);
            os.flush();
            
            byte[] receiveMsg = new byte[1024];
            String str = "";
            while(is.read()!=-1){
                is.read(receiveMsg);
                str += new String(receiveMsg);
                if(str.indexOf("over")!=-1){
                    break;
                }
            }
            str = URLDecoder.decode(str, "UTF-8");
            System.out.println("服务器端返回的信息为:"+str);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
            try {
                is.close();
                os.close();
                socket.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }        
    }
}

结果:

客户端模拟两次发送,第一次发送的信息为“来自客户端的请求”,第二次发送的信息为“来自客户端的请求over”。


可以看到:不管客户端请求多少次,服务端都是用一个AsynchronousServerSocketChannel(服务器监听通道)来处理多个客户端请求的。而且每个客户端请求的AsynchronousSocketChannel(Socket套接字通道)都是独立的。

4.扩展:

    拓展的内容就是现在很多人工作中用到的框架了。IO通信模型也有框架,比如Netty,MINA等。

    为什么要有框架呢?虽然java提供了多种IO模型来优化系统间通信,但是就像我们第一篇文章中讲的,系统间通信除了IO模型,还有其他的内容,比如消息的格式,协议等。而框架就是在java提供的IO模型的基础上,优化了消息格式,各种协议。而且,java中的IO模型有一个bug,Selector的select(long timeout)方法不会阻塞,那么就可能出现CPU使用率达到100%,Netty框架中就解决了这个问题。

猜你喜欢

转载自blog.csdn.net/Dream_Ryoma/article/details/80507584