摘要:读完本章,您将了解什么是NIO,NIO的基本原理,NIO的基本用法。
概念:直贴百度百科上的描述
NIO主要涉及Channel、Buffer、Selector。Channel译为通道,类似流,主要负责数据的传输;Buffer译为数据缓冲区,Channel上的数据只能通过Channel提供的read()或者write()方法读到Buffer或者将Buffer的数据写到Channel上,Buffer为所有原始类型(boolean类型除外)提供了子类(但我觉得除了ByteBuffer外,其他用处很小,因为Channel读或写的时候只能接受ByteBuffer类型,不像IO流有字符流可以很好的进行读写,这也是我对NIO的困惑之一,希望哪位高人指点一下);Selector译为选择器,正是因为有了Selector,NIO才有了它的强大之处(非阻塞式的高伸缩性网络),一个Selector可以管理多个非阻塞Channel,只有非阻塞Channel才可以注册到Selector上,换言之Channel注册之前需要设置为非阻塞(open之后默认是阻塞的),而像FileChannel是阻塞的不能注册到selector。
Channel介绍:
Java NIO 中主要的通道实现有:
a、FileChannel 从文件读写数据
b、DatagramChannel 通过UDP读写网络数据
c、SocketChannel 通过TCP读写网络上的数据,主要有两种方式创建SocketChannel,方式一为打开一个SocketChannel并连接到互联网上的某个服务器,方式二为一个新连接到达ServerSocketChannel时会创建一个SocketChannel。
d、ServerSocketChannel 可以监听新进来的TCP连接,像WEB服务器那样。为每一个新进来的连接都会创建一个SocketChannel。
针对每个Channel行驶的功能不一样,方法也略有不同,这里不再贴API。
Buffer介绍:
Buffer为什么叫数据缓冲区,我觉得可以这样解释,无论是解析通道(channel)的数据还是将我们想要传输的数据写到通道(channel)上,都需要经过Buffer,Buffer作为数据传输解析中间过度的存在者,所以叫做数据缓冲区。换言之,如果想要读通道上的数据,需要先读通道上的数据写到buffer上,再从buffer里获取内容数据;如果我们想将数据写到通道上,我们需要先把数据写到Buffer上,再调用通道上的write方法写到通道上,由此可见,buffer很好的启到了数据缓冲的作用。
Buffer主要有position、limit、capactity属性,position记录的是当前可操作(读或写)的下一个位置,limit表示的是这个buffer总共可操作的数量,capactity表示的是这个buffer的容量。比如新申请一个容量为1024的ByteBuffer,初始化之后的buffer是写模式,即position=0,limit=capactity=1024;当往buffer写了两个字节之后,position=2,limit还是等于capactity等于1024,这时候调用Buffer的flip()方法表示将当前buffer从写模式切换为读模式,limit=position=2表示可以读的个数(往里写了多少个就可以读多少个),将position重新置为0从第一位开始读,capacticy还是等于1024;将buffer的2个字节数据都写到channel之后,position=2,limit也是等于上一步flip()方法之后的值也为2,capacticy还是等于1024,将buffer的数据全部写到通道之后我们一般会调用buffer的clear()方法,重新将position置为0,limit=capactity,相当于重新初始化buffer,将其切换为写模式。
举个简单的SocketChannel/ServerSocketChannel例子,先不结合Selector使用:
NIO TCP客户端
// open 一个socketChannel SocketChannel socketChannel = SocketChannel.open(); SocketAddress address = new InetSocketAddress("127.0.0.1", 8091); // 连接到服务器 socketChannel.connect(address); ByteBuffer buffer = ByteBuffer.allocate(1024); // 设置为非阻塞模式 socketChannel.configureBlocking(false); // 如果未连接,等待连接完成再做 while (!socketChannel.finishConnect()) { } String msg = "Hello I'm coming!"; // 将msg写到buffer中 buffer.put(msg.getBytes()); // 将buffer的写模式切换为读模式 buffer.flip(); // 每次往channel写的数据不确定,只要buffer还有未写的数据就继续写 while (buffer.hasRemaining()) { socketChannel.write(buffer); } // 清空buffer buffer.clear(); // 关闭socketChannel 输出通道,对应的服务端才能读到文件末尾 socketChannel.shutdownOutput(); int i = 0; StringBuffer sb = new StringBuffer(); // 读取socketChannel上的数据,将其写到buffer中 while ((i = socketChannel.read(buffer)) != -1) { // socketChannel read()方法返回值表示读到了多少个字节,如果返回-1表示读到了文件末尾 if (i != 0) { byte[] array = buffer.array(); sb.append(new String(array, 0, i)); // 每次写完一次,清空buffer buffer.clear(); } } System.out.println(sb); socketChannel.close();
NIO TCP服务端(CHannel + Buffer实现)
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); SocketAddress address = new InetSocketAddress("127.0.0.1", 8091); // 绑定端口号地址 serverSocketChannel.bind(address); // 设置为非阻塞 serverSocketChannel.configureBlocking(false); // 循环监听多个连接,方法是阻塞的,每个时刻只能处理一个连接 while (true) { // 每个新进来的连接都创建一个SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); ByteBuffer buffer = ByteBuffer.allocate(1024); // 如果ServerSocketChannel是非阻塞的,accept可能返回空 if (socketChannel != null) { StringBuffer sb = new StringBuffer(); int i = 0; while ((i = socketChannel.read(buffer)) > 0) { byte[] array = buffer.array(); sb.append(new String(array, 0, i)); // 每次读完需要清空 buffer.clear(); } System.out.println(sb); String msg = "Welcome to NIO!"; // 清空,相当于重新初始化 buffer.clear(); buffer.put(msg.getBytes()); // 将buffer的写模式切换为读模式 buffer.flip(); // 一次写不能保证将buffer的内容都写到channel中,所以需要判断只要还有未写字节就接着写 while (buffer.hasRemaining()) { socketChannel.write(buffer); } socketChannel.shutdownOutput(); socketChannel.close(); } }
Selector介绍:
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个Channel,从而管理多个网络连接。仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,尽量保证使用的线程最少,性能最优。
我认为NIO的难点在于如何使用Selector,如何管理Selector以及让对应的Channel注册到Selector上。
下面用一个完整的例子说明如何创建Selector,如何将通道注册到Selector,将上面服务端改造一下,用上Selector。
Java NIO 服务端 Selector + Channel + Buffer实现:
// 创建选择器 Selector selector = Selector.open(); // 创建ServerSocketChannel ServerSocketChannel serverSocket = ServerSocketChannel.open(); SocketAddress address = new InetSocketAddress("127.0.0.1", 8091); // 绑定地址 serverSocket.bind(address); // 必须设置为非阻塞模式 serverSocket.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); // 将channel注册到selector,因为是服务端的serverSocketChannel,所以监听的事件是接受就绪事件 // 一个server socket channel准备好接收新进入的连接称为“接收就绪” serverSocket.register(selector, SelectionKey.OP_ACCEPT, buffer); // 轮询 while (true) { // 如果没有连接通道就绪,轮询selector,监听准备好的通道 while (selector.select() < 0) { } // 获取准备好的SelectedKeys Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 一个server socket channel准备好接收新进入的连接称为“接收就绪” if (selectionKey.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel(); // 如果是连接就绪,新进来的连接将会创建一个SocketChannel SocketChannel accept = serverChannel.accept(); if (accept != null) { // 必须设置为非阻塞,不然不能注册到selector accept.configureBlocking(false); // 监听读和写事件 accept.register(selectionKey.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } // 写就绪,将数据写到channel中 if (selectionKey.isWritable()) { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer tempBuffer = ByteBuffer.allocate(1024); tempBuffer.put("Server Msg !!!".getBytes()); tempBuffer.flip(); try { while (tempBuffer.hasRemaining()) { channel.write(tempBuffer); } } catch (Exception e) { } channel.shutdownOutput(); } // 读就绪,读取通道上的数据将其写到buffer if (selectionKey.isReadable()) { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer tempBuffer = ByteBuffer.allocate(1024); StringBuffer sb = new StringBuffer(); int i = 0; while ((i = channel.read(tempBuffer)) != -1) { if (i != 0) { byte[] array = tempBuffer.array(); sb.append(new String(array, 0, i)); tempBuffer.clear(); } } if (sb.length() > 0) { System.out.println(sb); } } if (selectionKey.isConnectable()) { System.out.println("selectionKey.isConnectable()"); } iterator.remove(); } }
上面的客户端也可进行改造,但是我感觉没多大必要,场景过于简单没必要去轮询Selector,要是单纯的一直轮询Selector,容易造成cpu使用率100%导致电脑卡顿,为了学习,这里稍加升级一下,供大家学习参考。
Selector selector = Selector.open(); SocketChannel socketChannel = SocketChannel.open(); SocketAddress address = new InetSocketAddress("127.0.0.1", 8091); socketChannel.connect(address); // 设置为非阻塞模式 socketChannel.configureBlocking(false); // 注册多个事件,事件之间用|连接 socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE, ByteBuffer.allocate(1024)); int p = 0; int q = 0; while (true) { // 如果没有连接通道就绪,将会一直轮询 while (selector.select() < 0) { } Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 某个channel成功连接到另一个服务器称为“连接就绪”。 // 不知道在什么时间点会进来,一直没执行 if (selectionKey.isConnectable()) { System.out.println("selectionKey.isConnectable()"); } if (selectionKey.isWritable()) { doWrite(selectionKey); p++; } if (selectionKey.isReadable()) { doRead(selectionKey); q++; } iterator.remove(); } // 这里读写只操作一次,一直轮询会消耗cpu,造成cpu使用率100% if (p > 0 && q > 0) { break; } } } public static void doWrite(SelectionKey selectionKey) throws Exception { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer tempBuffer = ByteBuffer.allocate(1024); tempBuffer.put("Client Msg ,do it better!".getBytes()); tempBuffer.flip(); try { while (tempBuffer.hasRemaining()) { channel.write(tempBuffer); } } catch (Exception e) { } channel.shutdownOutput(); } public static void doRead(SelectionKey selectionKey) throws Exception { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer tempBuffer = ByteBuffer.allocate(1024); StringBuffer sb = new StringBuffer(); int i = 0; while ((i = channel.read(tempBuffer)) != -1) { if (i != 0) { byte[] array = tempBuffer.array(); sb.append(new String(array, 0, i)); tempBuffer.clear(); } } if (sb.length() > 0) { System.out.println(sb); } }
分析一下上面的步骤,主要包括Selector的创建;Channel的创建;将Channel注册到对应的Selector中,监听感兴趣的事件;轮询Selector,找到就绪的事件,拿到通道进行对应的操作比如读写操作。
到这里Java NIO的基本方法使用演示完毕,您应该知道如何正确使用Selector/Buffer/Channel。当然这只是你使用NIO的第一步,实际的开发应用场景远比这复杂,但始终都脱离不了这些基础。
相比于普通IO,NIO的性能要好的多,其中一个原因就是NIO是非阻塞的,利用Selector一个线程可以管理多个通道可以提高CPU的使用率。还有就是,ByteBuffer.allocateDirector()分配的内存使用的是本机内存而不是Java堆上的内存,每一次分配内存时会调用操作系统的os::malloc()函数,直接ByteBuffer产生的数据如果和网络或者磁盘交互都在操作系统的内核空间中发生,不需要将数据复制到Java内存中,很显然执行这种IO操作要比一般的从操作系统的内核空间到Java堆上的切换操作快得多,因为它们可以避免在Java堆与本机堆之间复制数据。