socket(五)–AIO异步非阻

socket(五)–AIO异步非阻

一、简介

jdk1.7起,推出了nio2.0,引入了异步通道,即异步非阻塞AIO。这里对AIO进行介绍。

二、关键类及其方法

2.1 java.nio.channels.CompletionHandler

当接收连接accept、发起连接connect、读就绪read、写就绪write操作时的处理器,这里一个泛型,泛型的第一参数是处理的结果,第二参数是附件。内有两个方法completed和failed,即完成和失败时处理。源码如下:

package java.nio.channels;
//消费异步I/O结果的处理器
public interface CompletionHandler<V,A> {
		//成功时调用
    void completed(V result, A attachment);

   //失败时调用
    void failed(Throwable exc, A attachment);
}

2.2 java.nio.channels.AsynchronousChannelGroup

创建异步处理的共享资源,可理解为线程,使用中也是创建线程池,如:

AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);

2.3 java.nio.channels.AsynchronousServerSocketChannel

服务端的channel,通过静态方法open创建,如:

AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(group);

关键方法:

public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)

accept方法用于接收连接请求,第一个参数是附件,第二个参数是收到请求后的接收处理器。对应的处理器泛型,第一个参数是处理结果(这里为AsynchronousSocketChannel,即接收到的请求channel),第二个参数是附件。

2.4 java.nio.channels.AsynchronousSocketChannel

连接channel,服务端收到连接或者客户端发起连接时创建,用于数据的读写。关键方法:

三、示例

3.1 服务端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AioServerMain {
    static Charset charset = Charset.forName("utf-8");

    public static void main(String[] args) throws Exception {
        int port = 7001;

        //启动服务端
        new NioServer(port).start();
        TimeUnit.MINUTES.sleep(30);
    }

    /
     * server线程
     */
    static class NioServer extends Thread {
        int port;
        AsynchronousChannelGroup group;
        AsynchronousServerSocketChannel assc;

        public NioServer(int port) {
            try {
                this.port = port;
                //创建处理线程池
                group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
                //创建服务channel
                assc = AsynchronousServerSocketChannel.open(group);
                InetSocketAddress address = new InetSocketAddress(port);
                //绑定地下
                assc.bind(address);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            //接收请求
            //accept的第一个参数是附件,第二个参数是收到请求后的接收处理器
            //接收处理器AcceptHandler泛型的第一个参数是处理结果(这里为AsynchronousSocketChannel,即接收到的请求channel),第二个参数是附件(这里为NioServer,即其实例),
            assc.accept(this, new AcceptHandler());
        }
    }

    /
     * 接收请求处理器
     * CompletionHandler泛型的第一个参数是处理结果(这里为AsynchronousSocketChannel,即接收到的请求channel),第二个参数是附件(这里为NioServer,即其实例)
     */
    static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, NioServer> {

        @Override
        public void completed(AsynchronousSocketChannel result, NioServer attachment) {
            //继续接收下一个请求,构成循环
            attachment.assc.accept(attachment, this);

            try {
                System.out.println("accept from:" + result.getRemoteAddress().toString());

                //定义数据读取缓存
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                //读取数据,并传入数据到达时的处理器,
                //read的第一个参数是数据读取到的目标缓存,第二个参数是附件,第三个参数是读取结束后的处理器
                //读取处理器泛型的第一个参数是读取的字节数,第二个是附件类型
                result.read(readBuffer, readBuffer, new ReadHandler(result));

                //新开线程发送数据
                new WriteThread(result).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, NioServer attachment) {

        }
    }

    /
     * 读数据处理器
     * CompletionHandler泛型的第一个参数是读取的字节数,第二个是附件类型
     */
    static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        AsynchronousSocketChannel asc;

        public ReadHandler(AsynchronousSocketChannel asc) {
            this.asc = asc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            String readMsg = charset.decode(attachment).toString();
            System.out.println("server receive msg:" + readMsg);
            attachment.compact();

            //继续接收数据,构成循环
            asc.read(attachment, attachment, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {

        }
    }

    /
     * 写数据线程
     */
    static class WriteThread extends Thread {
        private AsynchronousSocketChannel channel;

        public WriteThread(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            //定义写缓冲
            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
            Scanner scanner = new Scanner(System.in);
            System.out.print("server send msg:");
            String msg = scanner.nextLine();
            writeBuffer.put(charset.encode(msg));
            writeBuffer.flip();
            //写入数据,并有写数据时的处理器,
            //write的第一个参数是数据写入的缓存,第二个参数是附件,第三个参数是写结束后的处理器
            //读取处理器泛型的第一个参数是写入的字节数,第二个是附件类型
            channel.write(writeBuffer, writeBuffer, new WriteHandler(channel, scanner));
        }
    }

    /
     * 写数据处理器
     */
    static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
        AsynchronousSocketChannel channel;
        Scanner scanner;

        public WriteHandler(AsynchronousSocketChannel channel, Scanner scanner) {
            this.channel = channel;
            this.scanner = scanner;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.compact();
            System.out.print("server send msg:");
            String msg = scanner.nextLine();
            attachment.put(charset.encode(msg));
            attachment.flip();

            //继续写数据,构成循环
            channel.write(attachment, attachment, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
        }
    }
}

3.2 客户端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AioClientMain {
    static Charset charset = Charset.forName("utf-8");

    public static void main(String[] args) throws Exception {
        int port = 7001;
        String host = "127.0.0.1";

        //启动客户端
        new NioClient(port, host).start();
        TimeUnit.MINUTES.sleep(30);
    }

    /
     * client线程
     */
    static class NioClient extends Thread {
        int port;
        String host;
        AsynchronousChannelGroup group;
        AsynchronousSocketChannel asc;
        InetSocketAddress address;

        public NioClient(int port, String host) {
            try {
                this.port = port;
                this.host = host;
                //创建处理线程组
                group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5);
                //创建客户端channel
                asc = AsynchronousSocketChannel.open(group);
                address = new InetSocketAddress(host, port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            //接收请求,并传入收到请求后的处理器
            //connect方法的第一个参数为连接的目标地址,第二参数为附件,第三个参数为连接处理器
            //连接处理器泛型的第一个参数为空(即Void),第二个参数为附件
            asc.connect(address, asc, new ConnectHandler());
        }
    }

    /
     * 连接服务处理器
     * 连接处理器CompletionHandler泛型的第一个参数为空(即Void),第二个参数为附件
     */
    static class ConnectHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {

        @Override
        public void completed(Void result, AsynchronousSocketChannel attachment) {

            try {
                System.out.println("connect server:" + attachment.getRemoteAddress().toString());

                //定义数据读取缓存
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                //读取数据,并传入数据到达时的处理器
                attachment.read(readBuffer, readBuffer, new ReadHandler(attachment));

                //新开线程发送数据
                new WriteThread(attachment).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
        }
    }

    /
     * 读数据处理器
     */
    static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        AsynchronousSocketChannel asc;

        public ReadHandler(AsynchronousSocketChannel asc) {
            this.asc = asc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            String readMsg = charset.decode(attachment).toString();
            System.out.println("client receive msg:" + readMsg);
            attachment.compact();

            //继续接收数据,构成循环
            asc.read(attachment, attachment, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {

        }
    }

    /
     * 写数据线程
     */
    static class WriteThread extends Thread {
        private AsynchronousSocketChannel channel;

        public WriteThread(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            //定义写缓冲
            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
            Scanner scanner = new Scanner(System.in);
            System.out.print("client send msg:");
            String msg = scanner.nextLine();
            writeBuffer.put(charset.encode(msg));
            writeBuffer.flip();
            channel.write(writeBuffer, writeBuffer, new WriteHandler(channel, scanner));
        }
    }

    /
     * 写数据处理器
     */
    static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
        AsynchronousSocketChannel channel;
        Scanner scanner;

        public WriteHandler(AsynchronousSocketChannel channel, Scanner scanner) {
            this.channel = channel;
            this.scanner = scanner;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.compact();
            System.out.print("client send msg:");
            String msg = scanner.nextLine();
            attachment.put(charset.encode(msg));
            attachment.flip();

            //继续写数据,构成循环
            channel.write(attachment, attachment, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
        }
    }
}
发布了274 篇原创文章 · 获赞 95 · 访问量 50万+

猜你喜欢

转载自blog.csdn.net/chinabestchina/article/details/105037706