每日一句
Talk is cheap. Show me the code. —— Linus Torvalds
上一篇文章咱们对Netty和几种IO模型做了初步的介绍,但其实都是一些理论性的内容,很枯燥,但即使如此我这还是强行加了不少的戏,就是怕写出来读完感觉太枯燥晦涩。没办法,有些技术确实就是这样的。
其实无论是同步/异步,还是阻塞非阻塞的IO模型,它们都是客观存在的一个东西,自从操作系统的发明以来,它就是天生携带着这几个事物共同诞生的,无论讲述者是用“烧开水”、“等厕所”、“送收快递”等等例子来描述,还是像我这样用直接生硬的,略显官方的文字来表达,本质是一样的,第一次学这个东西你肯定也都不好理解,我也曾经想过用一些生活中常见的场景例子来引出这些概念,后来转念一想还是算了,一来是怕越描越歪,误人子弟,二来我是怕万一我的例子过于生动形象,导致读者只记住了等厕所,没记住阻塞非阻塞,那就适得其反了。(可能我想多了)
好了,书归正传。毕竟计算机归根结底是一门实践性学科,这一篇咱们争取只捞干的。把几种IO模型的服务端代码示例都写出来,我负责写一写,跑一跑,你负责看一看,想一想,咱们共同进步,各得其所。
BIO
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author 晴天听夜曲
*/
public class BioServerDemo {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(5678);
System.out.println("======serverSocket创建成功,服务器启动======");
AtomicLong atomicLong = new AtomicLong(0L);
while (true) {
long num = atomicLong.get();
atomicLong.getAndIncrement();
System.out.println(">>>主线程: ======准备接收客户端的连接======");
// accept方法会一直阻塞,直到一个新客户端对5678端口发起连接,这一行代码才会往下执行
final Socket socket = serverSocket.accept();
System.out.println(">>>主线程: ======收到了一份客户端的请求======");
System.out.println(">>>主线程: ======ip是:" + socket.getInetAddress().getHostAddress());
System.out.println(">>>主线程: ======端口是:" + socket.getPort());
new Thread(() -> handler(socket)).start();
}
}
public static void handler(Socket socket) {
System.out.println("###子线程: 服务端正在与" + socket.getPort() + "端口进行交互");
System.out.println("###子线程: 当前处理任务的线程为: " + Thread.currentThread().getName());
while (true) {
byte[] bytes = new byte[4096];
try {
// 某一条线程正在处理
InputStream inputStream = socket.getInputStream();
System.out.println("###子线程: 准备读取客户端发来的消息...");
// 这一行代码也会阻塞,直到客户端发来数据
// 经历了网卡 --> 内核 --> 最后拷贝给这个JAVA进程,这行代码才会得到数据
int read = inputStream.read(bytes);
if (read == -1) {
System.out.println("###子线程: 客户端发起了关闭请求");
socket.close();
break;
}
String s = new String(bytes, 0, read);
System.out.println("###子线程: 打印客户端发来的消息: " + s);
// 为了演示,加一些无用的逻辑
if ("sayHi".equals(s)) {
// 如果客户端发来的刚好是hello bio这句话,那就回应一句话
OutputStream outputStream = socket.getOutputStream();
outputStream.write("hello client.".getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
阻塞式IO的代码写出来也比较简单,核心功能基本都抽象到ServerSocket类的API里边了,大体的编程逻辑就是监听指定端口,接收客户端的连接请求,一般新请求来临时都会使用开辟新线程的方式来处理读写。它的阻塞都体现在serverSocket.accept(),与inputStream.read(bytes)这两处了。那我又是怎么知道它是阻塞的这件事呢?而且还专门阻塞在这2个方法处了呢?其实不用怀疑也不用惊讶,这些知识的来源不是什么付费课程,也不是我无师自通,都是通过看javadoc了解到的。
/**
* Listens for a connection to be made to this socket and accepts
* it. The method blocks until a connection is made.
*
* <p>A new Socket {@code s} is created and, if there
* is a security manager,
* the security manager's {@code checkAccept} method is called
* with {@code s.getInetAddress().getHostAddress()} and
* {@code s.getPort()}
* as its arguments to ensure the operation is allowed.
* This could result in a SecurityException.
*
* @exception IOException if an I/O error occurs when waiting for a
* connection.
* @exception SecurityException if a security manager exists and its
* {@code checkAccept} method doesn't allow the operation.
* @exception SocketTimeoutException if a timeout was previously set with setSoTimeout and
* the timeout has been reached.
* @exception java.nio.channels.IllegalBlockingModeException
* if this socket has an associated channel, the channel is in
* non-blocking mode, and there is no connection ready to be
* accepted
*
* @return the new Socket
* @see SecurityManager#checkAccept
* @revised 1.4
* @spec JSR-51
*/
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
复制代码
官方注释写的很清楚,胜过了所有的其他材料。
演示效果
我使用的调试工具是sokit,是一个专门用于调试TCP/UDP协议数据的工具,和postman同属一科。
NIO
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
/**
* @author 晴天听夜曲
*/
public class NioServerDemo {
private Selector selector;
public static void main(String[] args) throws IOException {
NioServerDemo client = new NioServerDemo();
// 初始化动作
client.initServer(5680);
client.listen();
}
public void initServer(int port) throws IOException {
// 获得一个ServerSocket通道
ServerSocketChannel channel = ServerSocketChannel.open();
// 设置为非阻塞
channel.configureBlocking(false);
// 获得一个通道管理器
this.selector = Selector.open();
// 通道绑定主机端口
channel.bind(new InetSocketAddress(port));
// 注册通道
channel.register(selector, SelectionKey.OP_ACCEPT);
}
/**
* 采用轮询的方式监听selector上是否有需要处理的对象,如果有,则进行处理
*/
public void listen() throws IOException {
SocketChannel socketChannel = null;
// 轮询访问Selector
while (true) {
// 选择一组可以进行I/O的操作的事件,放在selector中,会产生阻塞
selector.select();
// 获得selector中选中的项的迭代器
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = ite.next();
// 调用iterator的remove()方法,并不是移除当前I/O通道,标识当前I/O通道已经处理。
ite.remove();
// 接收事件的发生
if (key.isAcceptable()) {
System.out.println("接收到客户端连接请求");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// 接收请求
socketChannel = serverSocketChannel.accept();
// 设置成非阻塞
socketChannel.configureBlocking(false);
// 和服务器端连接成功后,为了可以接受到服务端的信息,需要给通道设置读的权限
socketChannel.register(this.selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
read(key);
}
}
}
}
/**
* 处理读取客户端发来的信息
*/
public void read(SelectionKey key) throws IOException {
// 服务器可读取消息:得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(4096);
channel.read(buffer);
byte[] data = buffer.array();
int sum = 0;
for (byte datum : data) {
sum += datum;
}
if (sum == 0) {
key.cancel();
return;
}
String msg = new String(data).trim();
System.out.println(">>>收到客户端消息:" + msg);
if (msg.equals("sayHi")) {
String reback = "hello,client, I am NIO server..";
ByteBuffer outBuffer = ByteBuffer.wrap(reback.getBytes(StandardCharsets.UTF_8));
// 将消息回送给客户端
channel.write(outBuffer);
}
}
}
复制代码
NIO三大金刚
Buffer
NIO,即非阻塞IO,首先它新引入了ByteBuffer这一概念,通过加入缓冲区这个中间层,解耦开了业务层与网卡层的数据交互问题,不用我们亲自去各种Stream中获取了,缓冲区实际上是一个数组,我们读取数据,是指把数据读取到缓冲区中,写入数据时,也是写入到缓冲区中,任何时候访问NIO中的数据,都是对缓冲区做的操作,ByteBuffer提供了很多有用的操作数据的API,具体看javadoc即可。
Channel
另外一个重要概念就是Channel通道,网络数据通过Channel读取和写入,通道与流的不同之处在于通道是双向的,可以同时进行读写操作;而流是单向的,只能在一个方向上移动,这点也好理解,用结论去推理条件即可,因为我们学习java的时候只见过输入流或者输出流,从没见过“出入流”,而出入流根本也是不存在的。谁要是强行说存在出入流,您就对他说你懂个球?
Selector
Selector即多路复用器,是NIO编程模型的重中之重,多路复用器提供了选择已就绪任务的能力,它会不断的轮询注册在其上边的Channel,如果某个Channel上发生了读/写/连接事件,就会被Selector识别出来,然后通过SelectionKey可以获取到就绪的Channel集合(注意是集合,就是说一次轮询它可以识别到多条通道上的事件),进行后续的IO操作和我们自定义的代码。它的底层是靠操作系统提供的epoll()函数实现的,通常的编程模型是由一条线程负责多路复用器轮询,识别到事件之后开启新线程做具体处理操作。
演示效果
AIO
import lombok.SneakyThrows;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
/**
* 晴天听夜曲
*/
public class AioServerDemo {
public static void main(String[] args) {
try {
// 开启异步IO通道
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
// 绑定端口
channel.bind(new InetSocketAddress(5679));
// 回调处理类
AcceptHandler acceptHandler = new AcceptHandler(channel);
// 等待连接,这行代码不会阻塞,执行完就完事,连接到来的时候会回调acceptHandler里边的completed方法
channel.accept(null, acceptHandler);
// 让主线程保持存活,避免执行到此处就退出了进程
while (true) {
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
private AsynchronousServerSocketChannel channel;
public AcceptHandler(AsynchronousServerSocketChannel channel) {
this.channel = channel;
}
@SneakyThrows
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
System.out.println("连接建立完成");
// 异步处理连接
asyncHandle(result);
// 继续监听accept
channel.accept(null, this);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("accept failed");
}
}
private static void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
ByteBuffer dst = ByteBuffer.allocate(4096);
// 读取事件继续使用回调模式
asyncSocketChannel.read(dst, null, new CompletionHandler<Integer, Void>() {
@SneakyThrows
@Override
public void completed(Integer result, Void attachment) {
if (result > 0) {
dst.flip();
String msg = byteBuffer2String(dst);
System.out.println(">>>客户端发送的消息是: " + msg);
dst.clear();
if ("sayHi".equals(msg)) {
asyncSocketChannel.write(ByteBuffer.wrap("hello,client".getBytes(StandardCharsets.UTF_8)));
}
}
// 注册回调,继续读取输入
asyncSocketChannel.read(dst, null, this);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println(exc.getMessage());
}
});
}
public static String byteBuffer2String(ByteBuffer writeBuffer) {
int limit = writeBuffer.limit();
byte[] bs = new byte[limit];
int i = 0;
while (writeBuffer.hasRemaining()) {
bs[i++] = writeBuffer.get();
}
return new String(bs);
}
}
复制代码
演示效果
作为NIO的改进和增强,随JDK1.7版本更新被集成在JDK的nio包中,因此AIO也被称作是NIO2.0。AIO提供了从建立连接到读、写的全异步操作能力。但是随之而来的也对编码人员要求更高,首先要理解纯异步的理念,还有就是要对ByteBuffer数据容器和回调注册机制都要熟悉。否则AIO的代码还真是不太好写。
阶段性总结
- 1)BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,必须得1请求1线程来处理,并发性很受资源的局限,它是JDK1.4以前的唯一选择,但程序直观简单易理解;
- 2)NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,编程比较复杂,JDK1.4开始支持;
- 3)AIO方式使用于连接数目多且连接比较长(重操作)的架构,需要充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
接下来的路
虽然本篇文章代码很多,但如果您想更透彻的学好这部分知识,一两篇博文,两三段demo代码是远远不够的,建议您可以专门找一本书籍系统化的来阅读,在学习与实践的过程中不断思考和总结,只有当有了一定的积累之后,相互割裂的知识才会自然的联系起来,最终形成您无限的火力,到时各种各样的网络问题在您面前就将再无死角。
好了,本篇文章到这里就结束了,希望您能有所收获,祝您生活愉快。