完整代码查看:https://gitee.com/firewolf/java-io/tree/master/java-io 下面的java-bio、java-nio、java-aio
〇、引言
在Java中IO编程有如下几种:BIO,伪异步IO,NIO、AIO,对比如下:
一、传统的BIO
(一)BIO介绍
在基于传统同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作,连接成功后,双方通过输入流和输出流进行同步组阻塞时通讯。
通信模型图:
通常由一个Acceptor线程负责监听客户端的连接,接收到客户端请求后为每个客户端创建一个新的线程进行链路处理,处理完后通过输出流返回数据到客户端,线程销毁。
最大的问题就是缺乏弹性伸缩能力,客户端数量和服务端线程数呈现1:1的比例,会造成虚拟机线程急剧膨胀,从而导致系统效率下降。
(二)编程示例
1. 服务端
服务端主要是创建监听来监听客户端的socket请求,对于接入进来的socket创建一个单独的线程进行输入输出处理
- 启动类代码如下
package com.firewolf.java.io.bio.server;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 作者:刘兴 时间:2019/5/8
**/
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port); //1.创建socket监听,如果端口没有被占用,则会启动成功
System.out.println("时间服务器已经启动了,监听端口为" + port + "......");
Socket socket = null;
while (true) {
socket = serverSocket.accept(); //2. 循环等待客户端进行连接,没有客户端连接进来的时候在此阻塞
System.out.println("有客户端接入,");
new Thread(new TimeServerHandler(socket)).start(); //3.开启一个新的线程处理连接进来的客户端
}
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
}
- 处理客户端数据交互代码
package com.firewolf.java.io.bio.server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;
/**
* 作者:刘兴 时间:2019/5/8
**/
public class TimeServerHandler implements Runnable {
private Socket socket;
public TimeServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true);
String clientMsg = null;
while (true) { //1.循环接受客户端的消息
clientMsg = in.readLine();
if (clientMsg == null) {
break;
}
System.out.println("收到客户端的消息为: " + clientMsg);
out.println("当前时间为:" + new Date().toString()); //2.返回信息
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("客户端下线,释放socket....");
try {
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
if (socket != null) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2. 客户端
发起连接请求,发送消息,同时接受服务端返回的数据
package com.firewolf.java.io.bio.client;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;
/**
* 作者:刘兴 时间:2019/5/8
**/
public class TimeClient {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 8080);
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
out.println("请返回系统当前时间...."); //1.往服务端发送消息
String serverMsg = in.readLine(); //2.接受服务端返回的消息
System.out.println("服务端返回的消息为: " + serverMsg);
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("请求结束,主动释放与服务端的连接....");
try {
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
if (socket != null) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
在高并发情况下,后端的线程数急剧增长,导致系统性能下降甚至崩溃,这是同步阻塞I/O的最大问题。
二、伪异步IO
(一)伪异步I/O介绍
为了解决同步阻塞IO面临的一个链路需要一个线程处理的问题,有人对线程模型进行了优化——后端使用线程池进行处理,形成客户端线程数M:后端线程池最大线程数为N的关系。 其中M远大于N,后端可以灵活的控制线程数量,防止由于海量并发造成的后端线程资源耗尽的问题。模型如下:
(二)编程示例
伪异步io是在BIO的基础上进行了一些改造,就是把每次来socket之后启动一个线程来执行改变成利用线程池来处理,所以这里只改变服务端的代码,客户端不进行改造。
1. 创建一个用于启动任务的线程池
package com.firewolf.java.io.pseudoasync.server;
import com.firewolf.java.io.bio.server.TimeServerHandler;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 作者:刘兴 时间:2019/5/13
**/
public class TimeServerHandlerExecutor {
private ExecutorService executorService;
public TimeServerHandlerExecutor(int maxPoolSize, int queueSize) {
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize));
}
public void executeTask(TimeServerHandler handler){
executorService.execute(handler);
}
}
2.服务启动的时候创建好线程池,有了socket的时候使用线程池启动任务
package com.firewolf.java.io.pseudoasync.server;
import com.firewolf.java.io.bio.server.TimeServerHandler;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 作者:刘兴 时间:2019/5/8
**/
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port); //1.创建socket监听,如果端口没有被占用,则会启动成功
System.out.println("时间服务器已经启动了,监听端口为" + port + "......");
TimeServerHandlerExecutor timeServerHandlerExecutor = new TimeServerHandlerExecutor(20, 1000);
Socket socket = null;
while (true) {
socket = serverSocket.accept(); //2. 循环等待客户端进行连接,没有客户端连接进来的时候在此阻塞
System.out.println("有客户端接入,");
timeServerHandlerExecutor.executeTask(new TimeServerHandler(socket)); //3.使用线程池开启一个新的线程处理连接进来的客户端
}
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
}
伪异步IO能够防止后端服务器因为过多线程导致的资源耗尽的问题,但是,会导致前台的请求产生阻塞,从而导致系统效率依旧不高。
三、NIO
(一)NIO简介
NIO的理解大概有两种,
- New I /O :原因是NIO是Java后来新增的I/O类库;
- Non-Bolck I/O:因为NIO的目的是让Java支持非阻塞I/O(个人更倾向于这个说法)
与BIO中的ServerSocket和Socket一样,NIO中也提供了ServerSocketChannel和SocketChannel两种不同的套接字通道实现。这两种新增的通道都支持阻塞和非阻塞两种模式,阻塞模式使用简单,但是性能不好,非阻塞的刚刚好相反。
(二)NIO核心概念
1. 缓冲区Buffer
Buffer是一个对象,包含一些要写入或者读出的数据。在NIO中,所有的数据都是利用Buffer进行处理的。本质上是一个数组,一般是ByteBuffer,但是,他不仅仅是一个数组,还提供了对数据的结构化访问以及维护读和写位置(limit)等信息。ByteBuffer是最常用的缓冲区Buffer,实际上,Java针对每一种基本数据类型都有一个对应的Buffer缓冲区,这些类基本上都拥有相同的功能,只是处理的数据不同而已。
2. 通道Channel
Channel是一个通道,就像自来水管一样,网络数据使用Channel进行处理,通道和流不同的地方是,流是单向的,而通道是双向的。流只能在一个方向上移动(必须是InputStream或者是OutputStream中的一种),而通道可以用于读、写或者二者同时进行。
3.多路复用器Selector
多路复用器Selector是NIO编程的关键,它会不断的轮询注册在他上面的Channel,如果某个Channel上发生读或者写事件,这个Channel就会处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪的Channel集合, 进行IO处理。
一个Selector可以轮询多个Channel,由于jdk使用了epoll()代替了select(),所以并没有最大连接数句柄1024的限制。所以我们只需要一个线程轮询Selector,就可以接入成千上万的客户端。
(三)编程示例
这些使用异步IO编写一个能够进行群聊的小程序
1. 服务端
a. 时序图
b. 服务端代码
package com.firewolf.java.io.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* 作者:刘兴 时间:2019/5/13
**/
public class ChatServer {
private int port; //服务监听端口号
private ServerSocketChannel serverSocketChannel = null; //服务端Socket
private Selector selector = null; //多路复用器
//客户端连接Channel列表
Map<String, SocketChannel> clientChannelMap = new HashMap<>();
public ChatServer(int port) {
this.port = port;
initServer();
}
/**
* 初始化服务,打开监听端口
*/
private void initServer() {
try {
serverSocketChannel = ServerSocketChannel.open();//打开Socket
selector = Selector.open(); //创建多路复用
serverSocketChannel.bind(new InetSocketAddress(port));//绑定端口
serverSocketChannel.configureBlocking(false);//设置非阻塞
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//准备监听客户端接入
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 监听客户端请求
*/
public void listen() {
try {
System.out.println("正在进行监听,监听端口为:" + this.port);
while (true) {
selector.select(); //轮询直到有事件进入
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> handleSelectKey(selectionKey)); //对轮询到的事件进行处理
selectionKeys.clear();//清空轮询了事件
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 处理轮询到的事件
*/
private void handleSelectKey(SelectionKey selectionKey) {
try {
if (selectionKey.isValid()) {
if (selectionKey.isAcceptable()) { //有新客户端接入
handleNewClientConnect(selectionKey);
} else if (selectionKey.isReadable()) { //有客户端写入数据
handleClientMessage(selectionKey);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 处理新客户端接入
*/
private void handleNewClientConnect(SelectionKey selectionKey) throws IOException {
ServerSocketChannel ss = (ServerSocketChannel) selectionKey.channel();
SocketChannel client = ss.accept();
System.out.println("有新的客户端接入.....");
String address = client.getRemoteAddress().toString(); //客户端的地址
clientChannelMap.put(address, client); //保存客户端的请求
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);//准备读取数据
}
/**
* 读取客户端发送的信息,然后进行转发
*/
private void handleClientMessage(SelectionKey selectionKey) {
try {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int bytes = channel.read(readBuffer);
if (bytes > 0) {
readBuffer.flip();
String message = new String(readBuffer.array(), "UTF-8");
System.out.println("有客户端发送消息为:" + message);
//转发消息,正常聊天程序会发送给对应的用户(这个信息是携带在message里面的),这里简单的群发给所有人
dispathMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 转发信息
*/
public void dispathMessage(String message) {
clientChannelMap.values().forEach(client -> {
ByteBuffer writeBuffer = ByteBuffer.allocate(message.getBytes().length);
writeBuffer.put(message.getBytes());
writeBuffer.flip();
try {
client.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
});
}
public static void main(String[] args) {
ChatServer server = new ChatServer(8080);
server.listen();
}
}
2. 客户端
a. 时序图
b. 客户端代码
package com.firewolf.java.io.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
import java.util.Set;
/**
* 作者:刘兴 时间:2019/5/14 聊天程序客户端
**/
public class ChatClient {
private Selector selector = null;
private SocketChannel socketChannel = null;
public ChatClient(String host, int port) {
connectServer(host, port);
}
private void connectServer(String host, int port) {
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress(host, port));//连接客户端
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> handleSelectionKey(selectionKey));
selectionKeys.clear();//清空事件
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 处理被轮询到的事件
*/
private void handleSelectionKey(SelectionKey selectionKey) {
if (selectionKey.isValid()) {
if (selectionKey.isConnectable()) { //连接就绪事件
handleConnection(selectionKey);
} else if (selectionKey.isReadable()) { //有信息从服务端发过来
readMessage(selectionKey);
}
}
}
/**
* 连接就绪,准备接受用户的输入
*/
private void handleConnection(SelectionKey selectionKey) {
try {
SocketChannel channel = (SocketChannel) selectionKey.channel();
if (channel.isConnectionPending()) {
System.out.println("连接已经就绪....");
channel.finishConnect();
//启动线程监听客户端输入信息
new Thread(new Runnable() {
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
while (true) {
String message = scanner.nextLine();
ByteBuffer writeBuffer = ByteBuffer.allocate(message.getBytes().length);
writeBuffer.put(message.getBytes());
writeBuffer.flip();
try {
channel.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}).start();
channel.register(selector,SelectionKey.OP_READ);//注册读消息事件
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 读取服务端发送的消息
*/
public void readMessage(SelectionKey selectionKey) {
try {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int bytes = socketChannel.read(readBuffer);
if (bytes > 0) {
readBuffer.flip();
System.out.println(new String(readBuffer.array(), "UTF-8"));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new ChatClient("127.0.0.1", 8080);
}
}
启动服务端后,可以启动多个客户端,能够进行群聊操作。
NIO的优点在于可以进行异步处理了,但是编程的复杂度过高,不太容易编程,更难处理产生的一些bug。
四、AIO
(一)AIO简介
NIO2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供一下两种方式获取操作结果:
- 通过java.util.concurrent.Future类来表示异步操作的结果,使用get()方法来获取结果
- 在执行异步操作的时候传入一个java.nio.channels.CompletionHandler接口的实现类来作为异步操作结果的方法回调。
NIO2.0的异步套接字是真正的非阻塞I/O,对应于Unix网络编程中的事件驱动I/O(AIO),他不需要多路复用器Selector对注册的通道进行轮循操作即可实现异步操作,从而简化了NIO的模型。
(二)编程示例
1. 服务端
package com.firewolf.java.io.aio.server;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AIOServer {
static final int PORT = 30000;
final static String UTF_8 = "utf-8";
//保存客户端请求连接
static List<AsynchronousSocketChannel> channelList = new ArrayList<>();
public void startListen() throws InterruptedException, Exception {
// 创建一个线程池
ExecutorService executor = Executors.newFixedThreadPool(20);
// 以指定线程池来创建一个AsynchronousChannelGroup
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
// 以指定线程池来创建一个AsynchronousServerSocketChannel
AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(channelGroup)
// 指定监听本机的PORT端口
.bind(new InetSocketAddress(PORT));
// 使用CompletionHandler接受来自客户端的连接请求
serverChannel.accept(null, new AcceptHandler(serverChannel)); // ①
Thread.sleep(Long.MAX_VALUE); //睡眠线程,不让线程结束,实战中不需要
}
public static void main(String[] args) throws Exception {
AIOServer server = new AIOServer();
server.startListen();
}
}
// 实现自己的CompletionHandler类
class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private AsynchronousServerSocketChannel serverChannel;
public AcceptHandler(AsynchronousServerSocketChannel sc) {
this.serverChannel = sc;
}
// 定义一个ByteBuffer准备读取数据
ByteBuffer buff = ByteBuffer.allocate(1024);
// 当实际IO操作完成时候触发该方法
@Override
public void completed(final AsynchronousSocketChannel sc, Object attachment) {
// 记录新连接的进来的Channel
AIOServer.channelList.add(sc);
// 准备接受客户端的下一次连接
serverChannel.accept(null, this);
sc.read(buff, null, new CompletionHandler<Integer, Object>() // ②
{
@Override
public void completed(Integer result, Object attachment) {
buff.flip();
// 将buff中内容转换为字符串
String content = StandardCharsets.UTF_8.decode(buff).toString();
// 遍历每个Channel,将收到的信息写入各Channel中
for (AsynchronousSocketChannel c : AIOServer.channelList) {
try {
c.write(ByteBuffer.wrap(content.getBytes(AIOServer.UTF_8))).get();
} catch (Exception ex) {
ex.printStackTrace();
}
}
buff.clear();
// 读取下一次数据
sc.read(buff, null, this);
}
@Override
public void failed(Throwable ex, Object attachment) {
System.out.println("读取数据失败: " + ex);
// 从该Channel读取数据失败,就将该Channel删除
AIOServer.channelList.remove(sc);
}
});
}
@Override
public void failed(Throwable ex, Object attachment) {
System.out.println("连接失败: " + ex);
}
}
2. 客户端
package com.firewolf.java.io.aio.client;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AIOClient {
final static String UTF_8 = "utf-8";
final static int PORT = 30000;
// 与服务器端通信的异步Channel
AsynchronousSocketChannel clientChannel;
/**
1. 监听用户输入信息
*/
public void listenUserInput() throws Exception {
Scanner scan = new Scanner(System.in);
while (scan.hasNextLine()) {
clientChannel.write(ByteBuffer.wrap(scan.nextLine().getBytes(UTF_8))).get(); // ①
}
}
/**
2. 连接服务器
*/
public void connect() throws Exception {
// 定义一个ByteBuffer准备读取数据
final ByteBuffer buff = ByteBuffer.allocate(1024);
// 创建一个线程池
ExecutorService executor = Executors.newFixedThreadPool(80);
// 以指定线程池来创建一个AsynchronousChannelGroup
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
// 以channelGroup作为组管理器来创建AsynchronousSocketChannel
clientChannel = AsynchronousSocketChannel.open(channelGroup);
// 让AsynchronousSocketChannel连接到指定IP、指定端口
clientChannel.connect(new InetSocketAddress("127.0.0.1", PORT)).get();
// jta.append("---与服务器连接成功---\n");
System.out.println("---与服务器连接成功---");
buff.clear();
clientChannel.read(buff, null, new CompletionHandler<Integer, Object>() // ②
{
@Override
public void completed(Integer result, Object attachment) {
buff.flip();
// 将buff中内容转换为字符串
String content = StandardCharsets.UTF_8.decode(buff).toString();
// 显示从服务器端读取的数据
// jta.append("某人说:" + content + "\n");
System.out.println("某人说:" + content);
buff.clear();
//读取下一次数据
clientChannel.read(buff, null, this);
}
@Override
public void failed(Throwable ex, Object attachment) {
System.out.println("读取数据失败: " + ex);
}
});
}
public static void main(String[] args) throws Exception {
AIOClient client = new AIOClient();
client.connect();
client.listenUserInput();
}
}
五、Netty
在上面的几种IO模型里面,阻塞I/O和伪异步IO的效率都比较低,并发能力太弱,NIO采用了异步非阻塞编程模型,而且是一个I/O线程可以处理多条链路,它的调试和跟踪非常麻烦,特别是生成环境,我们都无法进行有效的调试和追踪,因此,在实际开发中使用多有不便。
(一)不选择原生NIO编程的原因
- NIO的API和类库繁杂,使用麻烦
- 需要具备其他的技术作为自称,如Java多线程
- 可靠性能力弱,工作量和难度都比较大,如客户端面临的断连重连,半包读写、失败缓存、网络闪断等等问题
- JDK IO的BUG,例如臭名昭著的epoll bug,会导致selector空轮询,最终导致CPU 100%.
(二)选择Netty的原因
Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性都是同类框架中首屈一指的,已经经过了成百上千的商用项目验证,例如Hadoop的RPC框架Avro就使用了Netty,其他也有很多的RPC框架也使用Netty来进行构建。Netty具有如下优点:
- API使用简单,开发门槛低
- 功能强大,预置了很多编码解码功能
- 定制能力强,可以对通讯框架进行很多的扩展
- 性能高,通过与其他的NIO框架对比,Netty的综合性能最高
- 成熟,稳定,Netty已经修复了NIO的epoll Bug
- 经历了很多大型商业应用的考验,质量有保证
- 社区活跃,版本迭代周期短,发现的Bug能得到及时修复