网络编程-nio

NIO
1.BIO:
	Block IO,同步阻塞IO,主要应用于文件IO(stream流的方式)和网络IO(socket)
2.NIO:
	None-Block IO,非阻塞IO,jdk1.4提供的新特性;NIO主要有三个核心分:Selector,Channel,Buffer;
由Selector监听多个通道事件,数据从通道输出到缓冲区,或者从缓冲区输入入到通道中;
3.AIO:
	 Asynchronous I/O,异步非阻塞IO
4.常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel。FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer
5.NIO模式原理:
	首先ServerSocketChannel将自己的监听客户端请求SelectionKEY.OP_ACCECPT事件注册到Selector
选择器中,每当客户端请求一次SocketChannel连接,Selector监听到事件,便调用
serverSocketChannel.accept()方法获得客户端请求的SocketChannel对象(不同客户端发送不同
SocketChannel),当客户端继续发送消息,触发Selector监听的OP_READ事件,再通过
SelectionKey.channel()获取触发当前事件的SocketChannel对象;selector.keys()保存所有的Channel对象,selector.selectedKeys()保存当前触发事件的对象,它们都是Set<SelectionKey>
6.NIO的同步非阻塞性:
	NIO的非阻塞性体现在,无论是服务器端监听SocketChannel连接,监听输入输出,还是客户端请求SocketChannel连接,这些过程都是非阻塞的;NIO的同步性体现在,服务端监听事件,客户端接收服务器端发送的消息,都离不开while(true)轮询来查询
7.IO对比总结
IO 的方式通常分为几种:同步阻塞的 BIO、同步非阻塞的 NIO、异步非阻塞的 AIO。
* BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并
发局限于应用中,JDK1.4 以前的唯一选择,但程序直观简单易理解。
* NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局
限于应用中,编程比较复杂,JDK1.4 开始支持。
* AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调
用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
NIO API
1.文件NIO
* ByteBuffer类(二进制数据),该类的主要方法如下所示:
    public abstract ByteBuffer put(byte[] b); 存储字节数据到缓冲区
    public abstract byte[] get(); 从缓冲区获得字节数据
    public final byte[] array(); 把缓冲区数据转换成字节数组
    public static ByteBuffer allocate(int capacity); 设置缓冲区的初始容量
    public static ByteBuffer wrap(byte[] array); 把一个现成的数组放到缓冲区中使用
    public final Buffer flip(); 翻转缓冲区,重置位置到初始位置
* FileChannel 类,该类主要用来对本地文件进行 IO 操作,主要方法如下所示:
    public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中
    public int write(ByteBuffer src) ,把缓冲区的数据写到通道中
    public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道
 中复制数据到当前通道
    public long transferTo(long position, long count, WritableByteChannel target),把数据从当
 前通道复制给目标通道

2.网络NIO
* 2.1 Selector(选择器),能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。
    public static Selector open(),得到一个选择器对象
    public int select(long timeout),监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 
SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
    public Set<SelectionKey> selectedKeys(),从内部集合中得到所有的就绪SelectionKey
    public Set<SelectionKey> keys(),从内部集合中得到所有的SelectionKey(包含就绪和未就绪)
* 2.2 SelectionKey,代表了 Selector 和网络通道的注册关系,一共四种:
    int OP_ACCEPT:有新的网络连接可以 accept,值为 16
    int OP_CONNECT:代表连接已经建立,值为 8
    int OP_READ 和 int OP_WRITE:代表了读、写操作,值为 1 和 4
    public abstract Selector selector(),得到与之关联的 Selector 对象
    public abstract SelectableChannel channel(),得到与之关联的通道
    public final Object attachment(),得到与之关联的共享数据
    public abstract SelectionKey interestOps(int ops),设置或改变监听事件
    public final boolean isAcceptable(),是否可以accept
    public final boolean isReadable(),是否可以读
    public final boolean isWritable(),是否可以写
* 2.3 ServerSocketChannel,用来在服务器端监听新的客户端 Socket 连接,常用方法如下所示:
    public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
    public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
    public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模
式,取值 false 表示采用非阻塞模式
    public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
    public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件
* 2.4 SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 总是把缓冲区的数据写入通
道,或者把通道里的数据读到缓冲区。常用方法如下所示:
    public static SocketChannel open(),得到一个 SocketChannel 通道
    public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模
式,取值 false 表示采用非阻塞模式
    public boolean connect(SocketAddress remote),连接服务器
    public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成连接操作
    public int write(ByteBuffer src),往通道里写数据
    public int read(ByteBuffer dst),从通道里读数据
    public final SelectionKey register(Selector sel, int ops, Object att),注册一个选择
器并设置监听事件,最后一个参数可以设置共享数据
    public final void close(),关闭通道

在这里插入图片描述

NIO API 测试
1.文件NIO
public class TestFileNio{
    @Test
    //NIO输出文件
    public void testWrite() throws Exception{
        //获取输出流对象
        FileOutputStream fos = new FileOutputStream("testNIO.txt");
        //获取文件通道对象
        FileChannel channel = fos.getChannel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("Hello NIO...".getBytes());
        //翻转缓冲区指针
        buffer.flip();
        //将缓冲区数据通过通道输出
        channel.write(buffer);
        //释放资源
        fos.close();
    }
    
    @Test
    //NIO读取文件
    public void testRead() throws Exception{
        //创建需要读取的File对象
        File file = new File("testNIO.txt");
        //获取输入流对象
        FileInputStream fis = new FileInputStream(file);
        //获取文件通道对象
        FileChannel channel = fis.getChannel();
        //创建缓冲区对象
        ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
        //通过通道将文件数据读取到缓冲区
        channel.read(buffer);
        //输出到控制台
        System.out.println(new String(buffer.array()));
        //释放资源
        fis.close();
    }
    
    @Test
    //NIO复制文件
    public void testTransfer() throws Exception{
        //获取输入流,输出流对象
        FileInputStream fis = new FileInputStream("testNIO.txt");
        FileOutputStream fos = new FileOutputStream("transferNIO.txt");
        //获取输入流通道,输出流通道
        FileChannel readChannel = fis.getChannel();
        FileChannel writeChannel = fos.getChannel();
        //复制文件,从position指针0开始读取,到最后一位
        writeChannel.transferFrom(readChannel,0,readChannel.size());
        //释放资源
        fos.close();
        fis.close();
    }
}

注意:
	1.buffer.flip()方法表示翻转缓冲区,重置指针(position)到初始位置;由于通道是双向的,当往缓冲区
中put数据后,缓冲区内部指针会记录缓冲区字节数,因此在将缓冲区数据通过通道输出前必须重置缓冲区内部的
position为初始值;否则管道输出的数据就会从position后开始输出,结果输出失败
	2.NIO的transferFrom()/transferTo()方法处理大型数据也非常的高效


2.网络NIO
public class TestWebNIOServer {
    public static void main(String[] args) throws Exception {
        //开启Selector对象
        Selector selector = Selector.open();
        //开启一个服务器通道对象
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //设置服务器通道监听的端口
        serverSocketChannel.bind(new InetSocketAddress(8888));
        //将当前服务器通道注册到Selector中,并通知Selector监听连接事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            //selector定时2s查询客户端发送的SocketChannel连接
            if (selector.select(2000) == 0) {    //非阻塞
                System.out.println("当前没有连接...");
                continue;
            }
            System.out.println("检测到了连接...");
            //获取事件Set集合
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                //当前事件是请求连接
                if (key.isAcceptable()) {
                    //接收客户端请求,并设置非阻塞模式
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    //在register中注册读取事件,并设置读取数据的缓冲区
                    ByteBuffer buffer1 = ByteBuffer.allocate(1024);
                    socketChannel.register(selector,SelectionKey.OP_READ, buffer1);
                }
                //当前事件是读取数据
                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    //获取缓冲区对象
                    //buffer1 == buffer2 true
                    ByteBuffer buffer2 = (ByteBuffer) key.attachment();
                    socketChannel.read(buffer2);
                    System.out.println("客户端数据:" + new String(buffer2.array()));
                }
                //注意:每次事件处理完成之后,必须从selector集合中删除这个事件,
                //防止重复处理同一个事件
                it.remove();
            }
        }
    }
}

	
public class TestWebNIOClient {
    public static void main(String[] args) throws Exception {
        //开启SocketChannel连接;设置非阻塞,服务端IP,端口
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
        //建立跟服务器端的连接
        if (!socketChannel.connect(address)) {
            //连接失败,再次连接必须用finishConnect(),否则异常
            while (!socketChannel.finishConnect()) {
                System.out.println("客户端发起连接,如果多次连接都连不上就会抛异常...");
            }
        }
        //开始向服务器端发送数据,ByteBuffer.wrap("Hello Server...".getBytes());
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("Hello Server...".getBytes());
        socketChannel.write(buffer);
        System.in.read();
    }
}

注意:
	1.使用NIO进行通讯的时候,管道会将缓冲区的所有数据都输入输出,因此会出现很多空格乱码,最好对所有的字符串使用str.trim();
	2.ServerSocketChannel相当于BIO中的ServerSocket,每当客户端请求一次SocketChannel连接,都会通过serverSocketChannel.accept()方法获得一个新的SocketChannel对象;
案例:NIO聊天室
//聊天服务器
public class TestChatServer {
    private Selector selector = null;
    private ServerSocketChannel listenerChannel = null;
    private int port = 9999;

    //在构造方法中初始化信息
    public TestChatServer() throws Exception {
        //开启选择器对象
        selector = Selector.open();
        //开启监听通道对象,并设置初始化信息
        listenerChannel = ServerSocketChannel.open();
        listenerChannel.configureBlocking(false);
        listenerChannel.bind(new InetSocketAddress(port));
        //将监听通道的连接事件注册到选择器中
        listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    //服务器开启业务逻辑的方法
    private void start() throws Exception {
        while (true) {
            //1.每隔2s监听连接请求事件
            if (selector.select(2000) == 0) {
                System.out.println("服务器等待连接...");
                continue;
            }
            //2.有用户连接后,监听读取事件,读取用户输入的消息,并广播给其他用户
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                if (key.isAcceptable()) {
                    //处理连接事件,注册读取事件
                    SocketChannel socketChannel = listenerChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                    System.out.println(socketChannel.getRemoteAddress().toString() + "上线...");
                }
                //处理读取事件
                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int count = socketChannel.read(buffer);
                    if (count > 0) {
                        //读取到了消息
                        String msg = new String(buffer.array(), "UTF-8");
                        //输出到控制台
                        printlnMsg(msg.trim());
                        //广播消息
                        broadcast(msg, socketChannel);
                    }
                }
                //注意:移除已处理事件
                it.remove();
            }
        }
    }

    //排除发送消息的用户,广播消息给其他用户
    private void broadcast(String msg, SocketChannel exclude) throws Exception {
        //获取选择器中的所有通道
        for (SelectionKey key : selector.keys()) {
           Channel channel = key.channel();
            if (channel instanceof SocketChannel && channel != exclude) {
                SocketChannel socketChannel = (SocketChannel) channel;
                socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
                System.out.println("发送一个channel:"+socketChannel.getRemoteAddress().toString());
            }
        }
    }

    //打印广播到服务器控制台
    private void printlnMsg(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(sdf.format(new Date()) + " --> \n" + msg);
    }

    public static void main(String[] args) throws Exception {
        new TestChatServer().start();
    }
}

//聊天客户端
public class TestChatClient {
    private String ip = "127.0.0.1";
    private int port = 9999;
    private SocketChannel socketChannel = null;
    private InetSocketAddress address = null;
    private String username;

    //初始化
    public TestChatClient() throws Exception {
        //开启socket通道,设置初始信息
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        address = new InetSocketAddress(ip, port);
        //开始请求连接
        if (!socketChannel.connect(address))
            //连接失败接着连
            while (!socketChannel.finishConnect()) ;
        username = socketChannel.getLocalAddress().toString();
        System.out.println(username + "连接成功...");

    }

    //发送消息
    public void sendMsg(String msg) throws Exception {
        //如果发送bye就退出聊天室
        if ("bye".equalsIgnoreCase(msg)) {
            socketChannel.close();
            return;
        }
        msg += username + " say: " + msg;
        socketChannel.write(ByteBuffer.wrap(msg.trim().getBytes()));
    }

    //接收消息
    public void receiveMsg() throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int count = socketChannel.read(buffer);
        if (count > 0) {
            System.out.println(new String(buffer.array(), "UTF-8").trim());
        }
    }
    
      public static void main(String[] args) throws Exception {
        final TestChatClient client = new TestChatClient();
        //开启线程去接收消息
        new Thread() {
            @Override
            public void run() {
                try {
                   while(true){
                       client.receiveMsg();
                       Thread.sleep(3000);
                   }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
        //发送消息
        Scanner sc = new Scanner(System.in);
        while (sc.hasNextLine()) {
            String msg = sc.nextLine();
            client.sendMsg(msg.trim());
        }
    }
}

猜你喜欢

转载自blog.csdn.net/qq_42514129/article/details/86298603