NIO
一.多线程编程
1.1 基本知识回顾
线程是比进程更小的能独立运行的基本单位,它是进程的一部分,一个进程可以拥有多
个线程,但至少要有一个线程,即主执行线程(Java 的 main 方法)。我们既可以编写单线程 应用,也可以编写多线程应用。
一个进程中的多个线程可以并发(同时)执行,在一些执行时间长、需要等待的任务上(例
如:文件读写和网络传输等),多线程就比较有用了。
怎么理解多线程呢?来两个例子:
1. 进程就是一个工厂,一个线程就是工厂中的一条生产线,一个工厂至少有一条生产
线,只有一条生产线就是单线程应用,拥有多条生产线就是多线程应用。多条生产线可
以同时运行。
2. 我们使用迅雷可以同时下载多个视频,迅雷就是进程,多个下载任务就是线程,这
几个线程可以同时运行去下载视频。
多线程可以共享内存、充分利用 CPU,通过提高资源(内存和 CPU)使用率从而提高程序
的执行效率。CPU 使用抢占式调度模式在多个线程间进行着随机的高速的切换。对于 CPU
的一个核而言,某个时刻,只能执行一个线程,而 CPU 在多个线程间的切换速度相对我们
的感觉要快很多,看上去就像是多个线程或任务在同时运行。
Java 天生就支持多线程并提供了两种编程方式,一个是继承 Thread 类,一个是实现
Runnable 接口,接下来咱们通过两个案例快速复习回顾一下。
方式一:继承Thread
方式二:实现Runnable
多线程会产生安全问题,,所以用加锁的方式(同步代码块和同步方法),比较形象的生产者消费者模式
二.BIO 编程
BIO 有的称之为 basic(基本) IO,有的称之为 block(阻塞) IO,主要应用于文件 IO 和网络 IO,
这里不再说文件 IO, 大家对此都非常熟悉,本次课程主要讲解网络 IO。
在 JDK1.4 之前,我们建立网络连接的时候只能采用 BIO,需要先在服务端启动一个
ServerSocket,然后在客户端启动 Socket 来对服务端进行通信,默认情况下服务端需要对每
个请求建立一个线程等待请求,而客户端发送请求后,先咨询服务端是否有线程响应,如果 没有则会一直等待或者遭到拒绝,如果有的话,客户端线程会等待请求结束后才继续执行, 这就是阻塞式 IO。具体代码就不讲了,在java基础中有详细介绍。
三.NIO 编程
3.1 概述
java.nio 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了 一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO)。新增了许多用于处理输入输出 的类,这些类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写,新增 了满足 NIO 的功能。Nio都在java.nio包下。
NIO 和 BIO 有着相同的目的和作用,但是它们的实现方式完全不同,BIO 以流的方式处 理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多。另外,NIO 是非阻塞式的, 这一点跟 BIO 也很不相同,使用它可以提供非阻塞式的高伸缩性网络。
NIO 主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)。传统的 BIO
基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据 总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通 道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。
3.2 文件 IO
3.2.1 概述和核心 API
缓冲区(Buffer):实际上是一个容器,是一个特殊的数组,缓冲区对象内置了一些机
制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道, 但是读取或写入的数据都必须经由 Buffer,如下图所示:
在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,常用的 Buffer 子类有:
l ByteBuffer,存储字节数据到缓冲区
l ShortBuffer,存储字符串数据到缓冲区
l CharBuffer,存储字符数据到缓冲区
l IntBuffer,存储整数数据到缓冲区
l LongBuffer,存储长整型数据到缓冲区
l DoubleBuffer,存储小数到缓冲区
l FloatBuffer,存储小数到缓冲区
对于 Java 中的基本数据类型,都有一个 Buffer 类型与之相对应,最常用的自然是
ByteBuffer 类(二进制数据),该类的主要方法如下所示:
l public abstract ByteBuffer put(byte[] b); 存储字节数据到缓冲区
l public abstract byte[] get(); 从缓冲区获得字节数据
l public final byte[] array(); 把缓冲区数据转换成字节数组
l public static ByteBuffer allocate(int capacity); 设置缓冲区的初始容量
l public static ByteBuffer wrap(byte[] array); 把一个现成的数组放到缓冲区中使用
l public final Buffer flip(); 翻转缓冲区,重置位置到初始位置
通道(Channel):类似于 BIO 中的 stream,例如 FileInputStream 对象,用来建立到目 标(文件,网络套接字,硬件设备等)的一个连接,但是需要注意:BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的, 既可以用来进行读操作,也可以用来进行写操作。常用的 Channel 类有:FileChannel、 DatagramChannel、ServerSocketChannel 和 SocketChannel。FileChannel 用于文件的数据读写, DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的 数据读写。
这里我们先讲解 FileChannel 类,该类主要用来对本地文件进行 IO 操作,主要方法如下所示:
l public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中
l public int write(ByteBuffer src) ,把缓冲区的数据写到通道中
l public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道
中复制数据到当前通道
l public long transferTo(long position, long count, WritableByteChannel target),把数据从当
前通道复制给目标通道
3.2.2 案例
接下来我们通过 NIO 实现几个案例,分别演示一下本地文件的读、写和复制操作,并
和 BIO 做个对比。
1. 往本地文件中写数据
@Test
public void test1() throws Exception{
String str="hello,nio,我是zy";
FileOutputStream fos=new FileOutputStream("basic.txt");
FileChannel fc=fos.getChannel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.put(str.getBytes());
buffer.flip();
fc.write(buffer);
fos.close();
}NIO 中的通道是从输出流对象里通过 getChannel 方法获取到的,该通道是双向的,既可
以读,又可以写。在往通道里写数据之前,必须通过 put 方法把数据存到 ByteBuffer 中,然 后通过通道的 write 方法写数据。在 write 之前,需要调用 flip 方法翻转缓冲区,把内部重置 到初始位置,这样在接下来写数据时才能把所有数据写到通道里。运行效果如下图所示:
2. 从本地文件中读数据
@Test
public void test2() throws Exception{
File file=new File("basic.txt");
FileInputStream fis=new FileInputStream(file);
FileChannel fc=fis.getChannel();
ByteBuffer buffer=ByteBuffer.allocate((int)file.length());
fc.read(buffer);
System.out.print(new String(buffer.array()));
fis.close();
}
@Test
public void test1() throws Exception{
String str="hello,nio,我是博学谷";
FileOutputStream fos=new FileOutputStream("basic.txt");
FileChannel fc=fos.getChannel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.put(str.getBytes());
buffer.flip();
fc.write(buffer);
fos.close();
}
NIO 中的通道是从输出流对象里通过 getChannel 方法获取到的,该通道是双向的,既可
以读,又可以写。在往通道里写数据之前,必须通过 put 方法把数据存到 ByteBuffer 中,然
后通过通道的 write 方法写数据。在 write 之前,需要调用 flip 方法翻转缓冲区,把内部重置
到初始位置,这样在接下来写数据时才能把所有数据写到通道里。运行效果如下图所示:
2. 从本地文件中读数据
@Test
public void test2() throws Exception{
File file=new File("basic.txt");
FileInputStream fis=new FileInputStream(file);
FileChannel fc=fis.getChannel();
ByteBuffer buffer=ByteBuffer.allocate((int)file.length());
fc.read(buffer);
System.out.print(new String(buffer.array()));
fis.close();
}
上述代码从输入流中获得一个通道,然后提供 ByteBuffer 缓冲区,该缓冲区的初始容量
和文件的大小一样,最后通过通道的 read 方法把数据读取出来并存储到了 ByteBuffer 中。
3. 复制文件
l
通过 BIO 复制一个视频文件,代码如下所示:
FileInputStream fis=new FileInputStream("C:\\Users\\zdx\\Desktop\\oracle.mov");
FileOutputStream fos=new FileOutputStream("d:\\oracle.mov");
byte[] b=new byte[1024];
while (true) {
int res=fis.read(b);
if(res==-1){
break;
}
fos.write(b,0,res);
}
fis.close();
fos.close();
上述代码分别通过输入流和输出流实现了文件的复制,这是通过传统的 BIO 实现的,大
家都比较熟悉,不再多说。
通过 NIO 复制相同的视频文件,代码如下所示:
@Test
public void test4() throws Exception{
FileInputStream fis=new FileInputStream("C:\\Users\\zdx\\Desktop\\oracle.mov");
FileOutputStream fos=new FileOutputStream("d:\\oracle.mov");
FileChannel sourceCh = fis.getChannel();
FileChannel destCh = fos.getChannel();
destCh.transferFrom(sourceCh, 0, sourceCh.size());
sourceCh.close();
destCh.close();
}
上述代码分别从两个流中得到两个通道,sourceCh 负责读数据,destCh 负责写数据,然
后直接调用 transferFrom 方法一步到位实现了文件复制。
3.3 网络 IO
3.3.1 概述和核心 API
前面在进行文件 IO 时用到的 FileChannel 并不支持非阻塞操作,学习 NIO 主要就是进行
网络 IO,Java NIO 中的网络通道是非阻塞 IO 的实现,基于事件驱动,非常适用于服务器需
要维持大量连接,但是数据交换量不大的情况,例如一些即时通信的服务等等....
在 Java 中编写 Socket 服务器,通常有以下几种模式:
l
一个客户端连接用一个线程,优点:程序编写简单;缺点:如果连接非常多,分配的线
程也会非常多,服务器可能会因为资源耗尽而崩溃。
l
把每一个客户端连接交给一个拥有固定数量线程的连接池,优点:程序编写相对简单,
可以处理大量的连接。确定:线程的开销非常大,连接如果非常多,排队现象会比较严
使用 Java 的 NIO用非阻塞的 IO 方式处理。这种模式可以用一个线程,处理大量的客
户端连接。
1. Selector(选择器),能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获
取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也
就是管理多个连接。这样使得只有在连接真正有读写事件发生时,才会调用函数来进行读写,
就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并
且避免了多线程之间的上下文切换导致的开销。
该类的常用方法如下所示:
l public static Selector open(),得到一个选择器对象
l public int select(long timeout),监控所有注册的通道,当其中有 IO 操作可以进行时,将 对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
l public Set<SelectionKey> selectedKeys(),从内部集合中得到所有的 SelectionKey
2. SelectionKey,代表了 Selector 和网络通道的注册关系,一共四种:
l int OP_ACCEPT:有新的网络连接可以 accept,值为 16
l int OP_CONNECT:代表连接已经建立,值为 8
l int OP_READ 和 int OP_WRITE:代表了读、写操作,值为 1 和 4
该类的常用方法如下所示:
l public abstract Selector selector(),得到与之关联的 Selector 对象
l public abstract SelectableChannel channel(),得到与之关联的通道
l public final Object attachment(),得到与之关联的共享数据
l public abstract SelectionKey interestOps(int ops),设置或改变监听事件
l public final boolean isAcceptable(),是否可以 accept
l public final boolean isReadable(),是否可以读
l public final boolean isWritable(),是否可以写
3. ServerSocketChannel,用来在服务器端监听新的客户端 Socket 连接,常用方法如下所示:
l public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
l public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
l public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞式,
取值 false 表示采用非阻塞模式
l public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
l public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件
4. SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 总是把缓冲区的数据写入通 道,或者把通道里的数据读到缓冲区。常用方法如下所示:
l public static SocketChannel open(),得到一个 SocketChannel 通道
l public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式, 取值 false 表示采用非阻塞模式
l public boolean connect(SocketAddress remote),连接服务器
l public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成
连接操作
l public int write(ByteBuffer src),往通道里写数据
l public int read(ByteBuffer dst),从通道里读数据
l public final SelectionKey register(Selector sel, int ops, Object att),注册一个选择器并设置
监听事件,最后一个参数可以设置共享数据
l public final void close() 关闭通道
3.3.2 入门案例
API 学习完毕后,接下来我们使用 NIO 开发一个入门案例,实现服务器端和客户端之间
的数据通信(非阻塞)。
//网络服务器端程序
public class NIOServer {
public static void main(String[] args) throws Exception{
//1. 得到一个 ServerSocketChannel 对象 老大
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
//2. 得到一个 Selector 对象
间谍
Selector selector=Selector.open();
//3. 绑定一个端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//4. 设置非阻塞方式
serverSocketChannel.configureBlocking(false);
//5. 把 ServerSocketChannel 对象注册给 Selector 对象
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 干活
while(true){
//6.1 监控客户端
if(selector.select(2000)==0){ //nio 非阻塞式的优势
System.out.println("Server:没有客户端搭理我,我就干点别的事");
continue;
}
//6.2 得到 SelectionKey,判断通道里的事件
Iterator<SelectionKey> keyIterator=selector.selectedKeys().iterator();
while(keyIterator.hasNext()){
SelectionKey key=keyIterator.next();
if(key.isAcceptable()){ //客户端连接请求事件
System.out.println("OP_ACCEPT");
SocketChannel socketChannel=serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if(key.isReadable()){ //读取客户端数据事件
SocketChannel channel=(SocketChannel) key.channel();
ByteBuffer buffer=(ByteBuffer) key.attachment();
channel.read(buffer);
System.out.println("客户端发来数据:"+new String(buffer.array()));
}
// 6.3 手动从集合中移除当前 key,防止重复处理
keyIterator.remove(); } } } }
上面代码用 NIO 实现了一个服务器端程序,能不断接受客户端连接并读取客户端发过来的
数据。
//网络客户端程序
public class NIOClient {
public static void main(String[] args) throws Exception{
//1. 得到一个网络通道
SocketChannel channel=SocketChannel.open();
//2. 设置非阻塞方式
channel.configureBlocking(false);
//3. 提供服务器端的 IP 地址和端口号
InetSocketAddress address=new InetSocketAddress("127.0.0.1",9999);
//4. 连接服务器端
if(!channel.connect(address)){
while(!channel.finishConnect()){ //nio 作为非阻塞式的优势
System.out.println("Client:连接服务器端的同时,我还可以干别的一些事情");
}
}
//5. 得到一个缓冲区并存入数据
String msg="hello,Server";
ByteBuffer writeBuf = ByteBuffer.wrap(msg.getBytes());
//6. 发送数据
channel.write(writeBuf);
System.in.read();
}
}
3.3.3 网络聊天案例
刚才我们通过 NIO 实现了一个入门案例,基本了解了 NIO 的工作方式和运行流程,接
下来我们用 NIO 实现一个多人聊天案例,具体代码如下所示:
public class ChatServer {
private Selector selector;
private ServerSocketChannel listenerChannel;
private static final int PORT = 9999; //服务器端口
public ChatServer() {
try {
// 得到选择器
selector = Selector.open();
// 打开监听通道
listenerChannel = ServerSocketChannel.open();
// 绑定端口
listenerChannel.bind(new InetSocketAddress(PORT));
// 设置为非阻塞模式
listenerChannel.configureBlocking(false);
// 将选择器绑定到监听通道并监听 accept 事件
listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
printInfo("Chat Server is ready.......");
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
try {
while (true) { //不停轮询
int count = selector.select();//获取就绪 channel
if (count > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 监听到 accept
if (key.isAcceptable()) {
SocketChannel sc = listenerChannel.accept();
//非阻塞模式
sc.configureBlocking(false);
//注册到选择器上并监听 read
sc.register(selector, SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress().toString().substring(1)+"上线了...");
//将此对应的 channel 设置为 accept,接着准备接受其他客户端请求
key.interestOps(SelectionKey.OP_ACCEPT);
}
//监听到 read
if (key.isReadable()) {
readMsg(key); //读取客户端发来的数据
}
//一定要把当前 key 删掉,防止重复处理
iterator.remove();
}
} else {
System.out.println("独自在寒风中等候...");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void readMsg(SelectionKey key) {
SocketChannel channel = null;
try {
// 得到关联的通道
channel = (SocketChannel) key.channel();
//设置 buffer 缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//从通道中读取数据并存储到缓冲区中
int count = channel.read(buffer);
//如果读取到了数据
if (count > 0) {
//把缓冲区数据转换为字符串
String msg = new String(buffer.array());
printInfo(msg);
//将关联的 channel 设置为 read,继续准备接受数据
key.interestOps(SelectionKey.OP_READ);
BroadCast(channel, msg); //向所有客户端广播数据
}
buffer.clear();
} catch (IOException e) {
try {
//当客户端关闭 channel 时,进行异常如理
printInfo(channel.getRemoteAddress().toString().substring(1) + "下线了...");
key.cancel(); //取消注册
channel.close(); //关闭通道
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
public void BroadCast(SocketChannel except, String msg) throws IOException {
System.out.println("发送广播...");
//广播数据到所有的 SocketChannel 中
for (SelectionKey key : selector.keys()) {
Channel targetchannel = key.channel();
//排除自身
if (targetchannel instanceof SocketChannel && targetchannel != except) {
SocketChannel dest = (SocketChannel) targetchannel;
//把数据存储到缓冲区中
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//往通道中写数据
dest.write(buffer);
}
}
}
private void printInfo(String str) { //往控制台打印消息
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("[" + sdf.format(new Date()) + "] -> " + str);
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.start();
} }
上述代码使用 NIO 编写了一个聊天程序的服务器端,可以接受客户端发来的数据,并能把
数据广播给所有客户端。
public class ChatClient {
private final String HOST = "127.0.0.1"; //服务器地址
private int PORT = 9999; //服务器端口
private Selector selector;
private SocketChannel socketChannel;
private String userName;
public ChatClient() throws IOException {
//得到选择器
selector = Selector.open();
//连接远程服务器
socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
//设置非阻塞
socketChannel.configureBlocking(false);
//注册选择器并设置为 read
socketChannel.register(selector, SelectionKey.OP_READ);
//得到客户端 IP 地址和端口信息,作为聊天用户名使用
userName = socketChannel.getLocalAddress().toString().substring(1);
System.out.println("---------------Client(" + userName + ") is ready---------------");
}
//向服务器端发送数据
public void sendMsg(String msg) throws Exception {
//如果控制台输入 bye 就关闭通道,结束聊天
if (msg.equalsIgnoreCase("bye")) {
socketChannel.close();
socketChannel = null;
return;
}
msg = userName + "说: " + msg;
try {
//往通道中写数据
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
//从服务器端接收数据
public void receiveMsg() {
try {
int readyChannels = selector.select();
if (readyChannels > 0) { //有可用通道
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = (SelectionKey) keyIterator.next();
if (sk.isReadable()) {
//得到关联的通道
SocketChannel sc = (SocketChannel) sk.channel();
//得到一个缓冲区
ByteBuffer buff = ByteBuffer.allocate(1024);
//读取数据并存储到缓冲区
sc.read(buff);
//把缓冲区数据转换成字符串
String msg = new String(buff.array());
System.out.println(msg.trim());
}
keyIterator.remove(); //删除当前 SelectionKey,防止重复处理
}
} else {
System.out.println("人呢?都去哪儿了?没人聊天啊...");
}
} catch (IOException e) {
e.printStackTrace();
} } }
上述代码通过 NIO 编写了一个聊天程序的客户端,可以向服务器端发送数据,并能接收服
务器广播的数据。
public class TestChat {
public static void main(String[] args) throws Exception {
//创建一个聊天客户端对象
ChatClient chatClient = new ChatClient();
new Thread() { //单独开一个线程不断的接收服务器端广播的数据
public void run() {
while (true) {
chatClient.receiveMsg();
try { //间隔 3 秒
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
Scanner scanner = new Scanner(System.in);
//在控制台输入数据并发送到服务器端
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
chatClient.sendMsg(msg);
}
} }
上述代码运行了聊天程序的客户端,并在主线程中发送数据,在另一个线程中不断接收服务 器端的广播数据,该代码运行一次就是一个聊天客户端,可以同时运行多个聊天客户端,
3.4 AIO 编程
JDK 7 引入了 Asynchronous I/O,即 AIO。在进行 I/O 编程中,常用到两种模式:Reactor
和 Proactor。Java 的 NIO 就是 Reactor,当有事件触发时,服务器端得到通知,进行相应的 处理。AIO 即 NIO2.0,叫做异步不阻塞的 IO。AIO 引入异步通道的概念,采用了 Proactor 模式, 简化了程序编写,一个有效的请求才启动一个线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。