NIO
1. BIO:
Block IO, 同步阻塞IO, 主要应用于文件IO ( stream流的方式) 和网络IO ( socket)
2. NIO:
None- Block IO, 非阻塞IO, jdk1. 4 提供的新特性; NIO主要有三个核心分: Selector, Channel, Buffer;
由Selector监听多个通道事件, 数据从通道输出到缓冲区, 或者从缓冲区输入入到通道中;
3. AIO:
Asynchronous I/ O, 异步非阻塞IO
4. 常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel。FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。Channel 提供从文件、网络读取数据的渠道, 但是读取或写入的数据都必须经由 Buffer
5. NIO模式原理:
首先ServerSocketChannel将自己的监听客户端请求SelectionKEY. OP_ACCECPT事件注册到Selector
选择器中, 每当客户端请求一次SocketChannel连接, Selector监听到事件, 便调用
serverSocketChannel. accept ( ) 方法获得客户端请求的SocketChannel对象( 不同客户端发送不同
SocketChannel) , 当客户端继续发送消息, 触发Selector监听的OP_READ事件, 再通过
SelectionKey. channel ( ) 获取触发当前事件的SocketChannel对象; selector. keys ( ) 保存所有的Channel对象, selector. selectedKeys ( ) 保存当前触发事件的对象, 它们都是Set< SelectionKey>
6. NIO的同步非阻塞性:
NIO的非阻塞性体现在, 无论是服务器端监听SocketChannel连接, 监听输入输出, 还是客户端请求SocketChannel连接, 这些过程都是非阻塞的; NIO的同步性体现在, 服务端监听事件, 客户端接收服务器端发送的消息, 都离不开while ( true ) 轮询来查询
7. IO对比总结
IO 的方式通常分为几种:同步阻塞的 BIO、同步非阻塞的 NIO、异步非阻塞的 AIO。
* BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并
发局限于应用中,JDK1. 4 以前的唯一选择,但程序直观简单易理解。
* NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局
限于应用中,编程比较复杂,JDK1. 4 开始支持。
* AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调
用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
NIO API
1.文件NIO
* ByteBuffer类(二进制数据),该类的主要方法如下所示:
public abstract ByteBuffer put(byte[] b); 存储字节数据到缓冲区
public abstract byte[] get(); 从缓冲区获得字节数据
public final byte[] array(); 把缓冲区数据转换成字节数组
public static ByteBuffer allocate(int capacity); 设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array); 把一个现成的数组放到缓冲区中使用
public final Buffer flip(); 翻转缓冲区,重置位置到初始位置
* FileChannel 类,该类主要用来对本地文件进行 IO 操作,主要方法如下所示:
public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中
public int write(ByteBuffer src) ,把缓冲区的数据写到通道中
public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道
中复制数据到当前通道
public long transferTo(long position, long count, WritableByteChannel target),把数据从当
前通道复制给目标通道
2.网络NIO
* 2.1 Selector(选择器),能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。
public static Selector open(),得到一个选择器对象
public int select(long timeout),监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的
SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
public Set< SelectionKey> selectedKeys(),从内部集合中得到所有的就绪SelectionKey
public Set< SelectionKey> keys(),从内部集合中得到所有的SelectionKey(包含就绪和未就绪)
* 2.2 SelectionKey,代表了 Selector 和网络通道的注册关系,一共四种:
int OP_ACCEPT:有新的网络连接可以 accept,值为 16
int OP_CONNECT:代表连接已经建立,值为 8
int OP_READ 和 int OP_WRITE:代表了读、写操作,值为 1 和 4
public abstract Selector selector(),得到与之关联的 Selector 对象
public abstract SelectableChannel channel(),得到与之关联的通道
public final Object attachment(),得到与之关联的共享数据
public abstract SelectionKey interestOps(int ops),设置或改变监听事件
public final boolean isAcceptable(),是否可以accept
public final boolean isReadable(),是否可以读
public final boolean isWritable(),是否可以写
* 2.3 ServerSocketChannel,用来在服务器端监听新的客户端 Socket 连接,常用方法如下所示:
public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模
式,取值 false 表示采用非阻塞模式
public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件
* 2.4 SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 总是把缓冲区的数据写入通
道,或者把通道里的数据读到缓冲区。常用方法如下所示:
public static SocketChannel open(),得到一个 SocketChannel 通道
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模
式,取值 false 表示采用非阻塞模式
public boolean connect(SocketAddress remote),连接服务器
public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成连接操作
public int write(ByteBuffer src),往通道里写数据
public int read(ByteBuffer dst),从通道里读数据
public final SelectionKey register(Selector sel, int ops, Object att),注册一个选择
器并设置监听事件,最后一个参数可以设置共享数据
public final void close(),关闭通道
NIO API 测试
1. 文件NIO
public class TestFileNio {
@Test
public void testWrite ( ) throws Exception{
FileOutputStream fos = new FileOutputStream ( "testNIO.txt" ) ;
FileChannel channel = fos. getChannel ( ) ;
ByteBuffer buffer = ByteBuffer. allocate ( 1024 ) ;
buffer. put ( "Hello NIO..." . getBytes ( ) ) ;
buffer. flip ( ) ;
channel. write ( buffer) ;
fos. close ( ) ;
}
@Test
public void testRead ( ) throws Exception{
File file = new File ( "testNIO.txt" ) ;
FileInputStream fis = new FileInputStream ( file) ;
FileChannel channel = fis. getChannel ( ) ;
ByteBuffer buffer = ByteBuffer. allocate ( ( int ) file. length ( ) ) ;
channel. read ( buffer) ;
System. out. println ( new String ( buffer. array ( ) ) ) ;
fis. close ( ) ;
}
@Test
public void testTransfer ( ) throws Exception{
FileInputStream fis = new FileInputStream ( "testNIO.txt" ) ;
FileOutputStream fos = new FileOutputStream ( "transferNIO.txt" ) ;
FileChannel readChannel = fis. getChannel ( ) ;
FileChannel writeChannel = fos. getChannel ( ) ;
writeChannel. transferFrom ( readChannel, 0 , readChannel. size ( ) ) ;
fos. close ( ) ;
fis. close ( ) ;
}
}
注意:
1. buffer. flip ( ) 方法表示翻转缓冲区, 重置指针( position) 到初始位置; 由于通道是双向的, 当往缓冲区
中put数据后, 缓冲区内部指针会记录缓冲区字节数, 因此在将缓冲区数据通过通道输出前必须重置缓冲区内部的
position为初始值; 否则管道输出的数据就会从position后开始输出, 结果输出失败
2. NIO的transferFrom ( ) / transferTo ( ) 方法处理大型数据也非常的高效
2. 网络NIO
public class TestWebNIOServer {
public static void main ( String[ ] args) throws Exception {
Selector selector = Selector. open ( ) ;
ServerSocketChannel serverSocketChannel = ServerSocketChannel. open ( ) ;
serverSocketChannel. configureBlocking ( false ) ;
serverSocketChannel. bind ( new InetSocketAddress ( 8888 ) ) ;
serverSocketChannel. register ( selector, SelectionKey. OP_ACCEPT) ;
while ( true ) {
if ( selector. select ( 2000 ) == 0 ) {
System. out. println ( "当前没有连接..." ) ;
continue ;
}
System. out. println ( "检测到了连接..." ) ;
Iterator< SelectionKey> it = selector. selectedKeys ( ) . iterator ( ) ;
while ( it. hasNext ( ) ) {
SelectionKey key = it. next ( ) ;
if ( key. isAcceptable ( ) ) {
SocketChannel socketChannel = serverSocketChannel. accept ( ) ;
socketChannel. configureBlocking ( false ) ;
ByteBuffer buffer1 = ByteBuffer. allocate ( 1024 ) ;
socketChannel. register ( selector, SelectionKey. OP_READ, buffer1) ;
}
if ( key. isReadable ( ) ) {
SocketChannel socketChannel = ( SocketChannel) key. channel ( ) ;
ByteBuffer buffer2 = ( ByteBuffer) key. attachment ( ) ;
socketChannel. read ( buffer2) ;
System. out. println ( "客户端数据:" + new String ( buffer2. array ( ) ) ) ;
}
it. remove ( ) ;
}
}
}
}
public class TestWebNIOClient {
public static void main ( String[ ] args) throws Exception {
SocketChannel socketChannel = SocketChannel. open ( ) ;
socketChannel. configureBlocking ( false ) ;
InetSocketAddress address = new InetSocketAddress ( "127.0.0.1" , 8888 ) ;
if ( ! socketChannel. connect ( address) ) {
while ( ! socketChannel. finishConnect ( ) ) {
System. out. println ( "客户端发起连接,如果多次连接都连不上就会抛异常..." ) ;
}
}
ByteBuffer buffer = ByteBuffer. allocate ( 1024 ) ;
buffer. put ( "Hello Server..." . getBytes ( ) ) ;
socketChannel. write ( buffer) ;
System. in. read ( ) ;
}
}
注意:
1. 使用NIO进行通讯的时候, 管道会将缓冲区的所有数据都输入输出, 因此会出现很多空格乱码, 最好对所有的字符串使用str. trim ( ) ;
2. ServerSocketChannel相当于BIO中的ServerSocket, 每当客户端请求一次SocketChannel连接, 都会通过serverSocketChannel. accept ( ) 方法获得一个新的SocketChannel对象;
案例:NIO聊天室
public class TestChatServer {
private Selector selector = null;
private ServerSocketChannel listenerChannel = null;
private int port = 9999 ;
public TestChatServer ( ) throws Exception {
selector = Selector. open ( ) ;
listenerChannel = ServerSocketChannel. open ( ) ;
listenerChannel. configureBlocking ( false ) ;
listenerChannel. bind ( new InetSocketAddress ( port) ) ;
listenerChannel. register ( selector, SelectionKey. OP_ACCEPT) ;
}
private void start ( ) throws Exception {
while ( true ) {
if ( selector. select ( 2000 ) == 0 ) {
System. out. println ( "服务器等待连接..." ) ;
continue ;
}
Iterator< SelectionKey> it = selector. selectedKeys ( ) . iterator ( ) ;
while ( it. hasNext ( ) ) {
SelectionKey key = it. next ( ) ;
if ( key. isAcceptable ( ) ) {
SocketChannel socketChannel = listenerChannel. accept ( ) ;
socketChannel. configureBlocking ( false ) ;
socketChannel. register ( selector, SelectionKey. OP_READ, ByteBuffer. allocate ( 1024 ) ) ;
System. out. println ( socketChannel. getRemoteAddress ( ) . toString ( ) + "上线..." ) ;
}
if ( key. isReadable ( ) ) {
SocketChannel socketChannel = ( SocketChannel) key. channel ( ) ;
ByteBuffer buffer = ( ByteBuffer) key. attachment ( ) ;
int count = socketChannel. read ( buffer) ;
if ( count > 0 ) {
String msg = new String ( buffer. array ( ) , "UTF-8" ) ;
printlnMsg ( msg. trim ( ) ) ;
broadcast ( msg, socketChannel) ;
}
}
it. remove ( ) ;
}
}
}
private void broadcast ( String msg, SocketChannel exclude) throws Exception {
for ( SelectionKey key : selector. keys ( ) ) {
Channel channel = key. channel ( ) ;
if ( channel instanceof SocketChannel && channel != exclude) {
SocketChannel socketChannel = ( SocketChannel) channel;
socketChannel. write ( ByteBuffer. wrap ( msg. getBytes ( ) ) ) ;
System. out. println ( "发送一个channel:" + socketChannel. getRemoteAddress ( ) . toString ( ) ) ;
}
}
}
private void printlnMsg ( String msg) {
SimpleDateFormat sdf = new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) ;
System. out. println ( sdf. format ( new Date ( ) ) + " --> \n" + msg) ;
}
public static void main ( String[ ] args) throws Exception {
new TestChatServer ( ) . start ( ) ;
}
}
public class TestChatClient {
private String ip = "127.0.0.1" ;
private int port = 9999 ;
private SocketChannel socketChannel = null;
private InetSocketAddress address = null;
private String username;
public TestChatClient ( ) throws Exception {
socketChannel = SocketChannel. open ( ) ;
socketChannel. configureBlocking ( false ) ;
address = new InetSocketAddress ( ip, port) ;
if ( ! socketChannel. connect ( address) )
while ( ! socketChannel. finishConnect ( ) ) ;
username = socketChannel. getLocalAddress ( ) . toString ( ) ;
System. out. println ( username + "连接成功..." ) ;
}
public void sendMsg ( String msg) throws Exception {
if ( "bye" . equalsIgnoreCase ( msg) ) {
socketChannel. close ( ) ;
return ;
}
msg += username + " say: " + msg;
socketChannel. write ( ByteBuffer. wrap ( msg. trim ( ) . getBytes ( ) ) ) ;
}
public void receiveMsg ( ) throws Exception {
ByteBuffer buffer = ByteBuffer. allocate ( 1024 ) ;
int count = socketChannel. read ( buffer) ;
if ( count > 0 ) {
System. out. println ( new String ( buffer. array ( ) , "UTF-8" ) . trim ( ) ) ;
}
}
public static void main ( String[ ] args) throws Exception {
final TestChatClient client = new TestChatClient ( ) ;
new Thread ( ) {
@Override
public void run ( ) {
try {
while ( true ) {
client. receiveMsg ( ) ;
Thread. sleep ( 3000 ) ;
}
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
} . start ( ) ;
Scanner sc = new Scanner ( System. in) ;
while ( sc. hasNextLine ( ) ) {
String msg = sc. nextLine ( ) ;
client. sendMsg ( msg. trim ( ) ) ;
}
}
}